From dd5602dc4f1d2e85f839c1b8adb2fb297e8762b7 Mon Sep 17 00:00:00 2001 From: Nico Mandery Date: Fri, 28 Jul 2023 18:40:45 +0200 Subject: [PATCH 01/43] Clearer comment on Context::get_object_store --- async-nats/src/jetstream/context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async-nats/src/jetstream/context.rs b/async-nats/src/jetstream/context.rs index c090d29dd..1fd009848 100644 --- a/async-nats/src/jetstream/context.rs +++ b/async-nats/src/jetstream/context.rs @@ -894,7 +894,7 @@ impl Context { }) } - /// Creates a new object store bucket. + /// Get an existing object store bucket. /// /// # Examples /// From 97a94989740628c9a2b46f375441b3a94e76d659 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Sun, 30 Jul 2023 18:06:33 +0100 Subject: [PATCH 02/43] Fix potentially ambiguous `.as_ref()` --- async-nats/src/tls.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async-nats/src/tls.rs b/async-nats/src/tls.rs index 6ada7dfd2..6dc7b122b 100644 --- a/async-nats/src/tls.rs +++ b/async-nats/src/tls.rs @@ -77,7 +77,7 @@ pub(crate) async fn config_tls(options: &ConnectorOptions) -> io::Result>>() - .as_ref(), + .as_slice(), ); } From 78862ed0581b79ba5772396a20c4a657a09244ac Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 31 Jul 2023 08:38:11 +0200 Subject: [PATCH 03/43] Bump msrv to 1.67.0 Signed-off-by: Tomasz Pietrek --- .github/workflows/test.yml | 4 ++-- async-nats/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4071d5a6c..cb5d75e00 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -183,8 +183,8 @@ jobs: run: | set -eo pipefail echo "msrv check" - rustup install 1.65.0 - cargo +1.65.0 check + rustup install 1.67.0 + cargo +1.67.0 check check_examples: name: check (examples) diff --git a/async-nats/Cargo.toml b/async-nats/Cargo.toml index 8e55cd322..d41e0d77a 100644 --- a/async-nats/Cargo.toml +++ b/async-nats/Cargo.toml @@ -3,7 +3,7 @@ name = "async-nats" authors = ["Tomasz Pietrek ", "Casper Beyer "] version = "0.31.0" edition = "2021" -rust = "1.64.0" +rust = "1.67.0" description = "A async Rust NATS client" license = "Apache-2.0" documentation = "https://docs.rs/async-nats" From d3892aa1c3529ad6c2bd3dc0a48901e6df68c9cc Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 31 Jul 2023 09:16:37 +0200 Subject: [PATCH 04/43] Bump dependencies Signed-off-by: Tomasz Pietrek --- async-nats/Cargo.toml | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/async-nats/Cargo.toml b/async-nats/Cargo.toml index d41e0d77a..2867e7496 100644 --- a/async-nats/Cargo.toml +++ b/async-nats/Cargo.toml @@ -16,22 +16,22 @@ categories = ["network-programming", "api-bindings"] [dependencies] memchr = "2.4" bytes = { version = "1.4.0", features = ["serde"] } -futures = { version = "0.3.26", default-features = false, features = ["std", "async-await"] } +futures = { version = "0.3.28", default-features = false, features = ["std", "async-await"] } nkeys = "0.3.0" -once_cell = "1.17.1" -regex = "1.7.1" -serde = { version = "1.0.152", features = ["derive"] } -serde_json = "1.0.93" -serde_repr = "0.1.10" +once_cell = "1.18.0" +regex = "1.9.1" +serde = { version = "1.0.179", features = ["derive"] } +serde_json = "1.0.104" +serde_repr = "0.1.16" http = "0.2.9" -tokio = { version = "1.25.0", features = ["macros", "rt", "fs", "net", "sync", "time", "io-util"] } +tokio = { version = "1.29.0", features = ["macros", "rt", "fs", "net", "sync", "time", "io-util"] } itoa = "1" url = { version = "2"} tokio-rustls = "0.24" rustls-pemfile = "1.0.2" nuid = "0.4.1" serde_nanos = "0.1.3" -time = { version = "0.3.20", features = ["parsing", "formatting", "serde", "serde-well-known"] } +time = { version = "0.3.24", features = ["parsing", "formatting", "serde", "serde-well-known"] } rustls-native-certs = "0.6" tracing = "0.1" thiserror = "1.0" @@ -39,7 +39,7 @@ base64 = "0.21" tokio-retry = "0.3" ring = "0.16" rand = "0.8" -webpki = { package = "rustls-webpki", version = "0.101.1", features = ["alloc", "std"] } +webpki = { package = "rustls-webpki", version = "0.101.2", features = ["alloc", "std"] } [dev-dependencies] criterion = { version = "0.5", features = ["async_tokio"]} From cc96dae8b5e6ab5738fbf6087defc49a6b4e16e5 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Sun, 30 Jul 2023 10:21:01 +0200 Subject: [PATCH 05/43] Add missing license header Signed-off-by: Tomasz Pietrek --- async-nats/src/error/mod.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/async-nats/src/error/mod.rs b/async-nats/src/error/mod.rs index 0bbf12e14..13cd633af 100644 --- a/async-nats/src/error/mod.rs +++ b/async-nats/src/error/mod.rs @@ -1,3 +1,16 @@ +// Copyright 2020-2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use std::fmt::{Debug, Display}; /// The error type for the NATS client, generic by the kind of error. From 0c2eb67bef624597555dedf1d3213e73e4188eca Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 31 Jul 2023 08:04:07 +0200 Subject: [PATCH 06/43] Move and flatten error module Signed-off-by: Tomasz Pietrek --- async-nats/src/{error/mod.rs => error.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename async-nats/src/{error/mod.rs => error.rs} (100%) diff --git a/async-nats/src/error/mod.rs b/async-nats/src/error.rs similarity index 100% rename from async-nats/src/error/mod.rs rename to async-nats/src/error.rs From 158b6d862c6b0edc6a4a4cc9874fc78bb45eb765 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Fri, 28 Jul 2023 13:34:39 +0200 Subject: [PATCH 07/43] Add update object metadata Signed-off-by: Tomasz Pietrek --- async-nats/src/jetstream/object_store/mod.rs | 129 +++++++++++++++++++ async-nats/tests/object_store.rs | 37 ++++++ 2 files changed, 166 insertions(+) diff --git a/async-nats/src/jetstream/object_store/mod.rs b/async-nats/src/jetstream/object_store/mod.rs index b5f4be998..74dea1f94 100644 --- a/async-nats/src/jetstream/object_store/mod.rs +++ b/async-nats/src/jetstream/object_store/mod.rs @@ -486,6 +486,79 @@ impl ObjectStore { .await?; Ok(()) } + + /// Updates [Object] [ObjectMeta]. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use async_nats::jetstream::object_store; + /// let client = async_nats::connect("demo.nats.io").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// + /// let mut bucket = jetstream.get_object_store("store").await?; + /// bucket.update_metadata("object", object_store::ObjectMeta{ + /// name: "new_name".to_string(), + /// description: Some("a new description".to_string()), + /// link: None, + /// }).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn update_metadata>( + &self, + object: A, + metadata: ObjectMeta, + ) -> Result { + let mut info = self.info(object.as_ref()).await?; + + info.name = metadata.name; + info.description = metadata.description; + info.link = metadata.link; + + let name = encode_object_name(object.as_ref()); + let subject = format!("$O.{}.M.{}", &self.name, &name); + + let mut headers = HeaderMap::new(); + headers.insert( + NATS_ROLLUP, + ROLLUP_SUBJECT.parse::().map_err(|err| { + UpdateMetadataError::with_source( + UpdateMetadataErrorKind::Other, + format!("failed parsing header: {}", err), + ) + })?, + ); + let data = serde_json::to_vec(&info).map_err(|err| { + UpdateMetadataError::with_source( + UpdateMetadataErrorKind::Other, + format!("failed serializing object info: {}", err), + ) + })?; + + // publish meta. + self.stream + .context + .publish_with_headers(subject, headers, data.into()) + .await + .map_err(|err| { + UpdateMetadataError::with_source( + UpdateMetadataErrorKind::PublishMetadata, + format!("failed publishing metadata: {}", err), + ) + })? + .await + .map_err(|err| { + UpdateMetadataError::with_source( + UpdateMetadataErrorKind::PublishMetadata, + format!("failed ack from metadata publish: {}", err), + ) + })?; + + Ok(info) + } } pub struct Watch<'a> { @@ -742,6 +815,62 @@ impl From<&str> for ObjectMeta { } } +impl From for ObjectMeta { + fn from(info: ObjectInfo) -> Self { + ObjectMeta { + name: info.name, + description: info.description, + link: info.link, + } + } +} + +#[derive(Debug)] +pub struct UpdateMetadataError { + kind: UpdateMetadataErrorKind, + source: Option>, +} + +crate::error_impls!(UpdateMetadataError, UpdateMetadataErrorKind); + +impl Display for UpdateMetadataError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.kind() { + UpdateMetadataErrorKind::InvalidName => write!(f, "invalid object name"), + UpdateMetadataErrorKind::NotFound => write!(f, "object not found"), + UpdateMetadataErrorKind::TimedOut => write!(f, "timed out"), + UpdateMetadataErrorKind::Other => write!(f, "error: {}", self.format_source()), + UpdateMetadataErrorKind::PublishMetadata => { + write!(f, "failed publishing metadata: {}", self.format_source()) + } + } + } +} + +impl From for UpdateMetadataError { + fn from(error: InfoError) -> Self { + match error.kind() { + InfoErrorKind::InvalidName => { + UpdateMetadataError::new(UpdateMetadataErrorKind::InvalidName) + } + InfoErrorKind::NotFound => UpdateMetadataError::new(UpdateMetadataErrorKind::NotFound), + InfoErrorKind::Other => { + UpdateMetadataError::with_source(UpdateMetadataErrorKind::Other, error) + } + InfoErrorKind::TimedOut => UpdateMetadataError::new(UpdateMetadataErrorKind::TimedOut), + } + } +} + +#[derive(Debug, PartialEq, Clone)] +pub enum UpdateMetadataErrorKind { + InvalidName, + NotFound, + TimedOut, + Other, + PublishMetadata, +} + #[derive(Clone, Copy, Debug, PartialEq)] pub enum InfoErrorKind { InvalidName, diff --git a/async-nats/tests/object_store.rs b/async-nats/tests/object_store.rs index abbe6995f..58c10f547 100644 --- a/async-nats/tests/object_store.rs +++ b/async-nats/tests/object_store.rs @@ -343,4 +343,41 @@ mod object_store { .await .unwrap(); } + + #[tokio::test] + async fn update_metadata() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + + let jetstream = async_nats::jetstream::new(client); + + let bucket = jetstream + .create_object_store(async_nats::jetstream::object_store::Config { + bucket: "bucket".to_string(), + ..Default::default() + }) + .await + .unwrap(); + bucket + .put("DATA", &mut "some data".as_bytes()) + .await + .unwrap(); + + let given_metadata = ObjectMeta { + name: "data".to_owned(), + description: Some("description".to_string()), + link: None, + }; + + bucket + .update_metadata("DATA", given_metadata.clone()) + .await + .unwrap(); + + let info = bucket.info("DATA").await.unwrap(); + + assert_eq!(info.name, given_metadata.name); + assert_eq!(info.description, given_metadata.description); + assert_eq!(info.link, given_metadata.link); + } } From 932aa8b3231cda8dc9cfe74417344dcfdaa9074a Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Sat, 29 Jul 2023 13:31:29 +0200 Subject: [PATCH 08/43] Properly handle object name update Signed-off-by: Tomasz Pietrek --- .config/nats.dic | 1 + async-nats/src/jetstream/object_store/mod.rs | 104 ++++++++++++++----- async-nats/tests/object_store.rs | 30 +++++- 3 files changed, 102 insertions(+), 33 deletions(-) diff --git a/.config/nats.dic b/.config/nats.dic index b3adc9f98..e2e2b0174 100644 --- a/.config/nats.dic +++ b/.config/nats.dic @@ -140,3 +140,4 @@ filter_subject filter_subjects rollup IoT +ObjectMeta diff --git a/async-nats/src/jetstream/object_store/mod.rs b/async-nats/src/jetstream/object_store/mod.rs index 74dea1f94..673547142 100644 --- a/async-nats/src/jetstream/object_store/mod.rs +++ b/async-nats/src/jetstream/object_store/mod.rs @@ -499,11 +499,16 @@ impl ObjectStore { /// let jetstream = async_nats::jetstream::new(client); /// /// let mut bucket = jetstream.get_object_store("store").await?; - /// bucket.update_metadata("object", object_store::ObjectMeta{ - /// name: "new_name".to_string(), - /// description: Some("a new description".to_string()), - /// link: None, - /// }).await?; + /// bucket + /// .update_metadata( + /// "object", + /// object_store::ObjectMeta { + /// name: "new_name".to_string(), + /// description: Some("a new description".to_string()), + /// link: None, + /// }, + /// ) + /// .await?; /// # Ok(()) /// # } /// ``` @@ -514,11 +519,54 @@ impl ObjectStore { ) -> Result { let mut info = self.info(object.as_ref()).await?; + // If name is being update, we need to check if other metadata with it already exists. + // If does, error. Otherwise, purge old name metadata. + if metadata.name != info.name { + tracing::info!("new metadata name is different than then old one"); + if !is_valid_object_name(&metadata.name) { + return Err(UpdateMetadataError::new( + UpdateMetadataErrorKind::InvalidName, + )); + } + match self.info(&metadata.name).await { + Ok(_) => { + return Err(UpdateMetadataError::new( + UpdateMetadataErrorKind::NameAlreadyInUse, + )) + } + Err(err) => match err.kind() { + InfoErrorKind::NotFound => { + tracing::info!("purging old metadata: {}", info.name); + self.stream + .purge() + .filter(format!( + "$O.{}.M.{}", + self.name, + encode_object_name(&info.name) + )) + .await + .map_err(|err| { + UpdateMetadataError::with_source( + UpdateMetadataErrorKind::Purge, + err, + ) + })?; + } + _ => { + return Err(UpdateMetadataError::with_source( + UpdateMetadataErrorKind::Other, + err, + )) + } + }, + } + } + info.name = metadata.name; info.description = metadata.description; info.link = metadata.link; - let name = encode_object_name(object.as_ref()); + let name = encode_object_name(&info.name); let subject = format!("$O.{}.M.{}", &self.name, &name); let mut headers = HeaderMap::new(); @@ -825,24 +873,31 @@ impl From for ObjectMeta { } } -#[derive(Debug)] -pub struct UpdateMetadataError { - kind: UpdateMetadataErrorKind, - source: Option>, +#[derive(Debug, PartialEq, Clone)] +pub enum UpdateMetadataErrorKind { + InvalidName, + NotFound, + TimedOut, + Other, + PublishMetadata, + NameAlreadyInUse, + Purge, } -crate::error_impls!(UpdateMetadataError, UpdateMetadataErrorKind); - -impl Display for UpdateMetadataError { +impl Display for UpdateMetadataErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.kind() { - UpdateMetadataErrorKind::InvalidName => write!(f, "invalid object name"), - UpdateMetadataErrorKind::NotFound => write!(f, "object not found"), - UpdateMetadataErrorKind::TimedOut => write!(f, "timed out"), - UpdateMetadataErrorKind::Other => write!(f, "error: {}", self.format_source()), - UpdateMetadataErrorKind::PublishMetadata => { - write!(f, "failed publishing metadata: {}", self.format_source()) + match self { + Self::InvalidName => write!(f, "invalid object name"), + Self::NotFound => write!(f, "object not found"), + Self::TimedOut => write!(f, "timed out"), + Self::Other => write!(f, "error"), + Self::PublishMetadata => { + write!(f, "failed publishing metadata") + } + Self::NameAlreadyInUse => { + write!(f, "object with updated name already exists") } + Self::Purge => write!(f, "failed purging old name metadata"), } } } @@ -862,14 +917,7 @@ impl From for UpdateMetadataError { } } -#[derive(Debug, PartialEq, Clone)] -pub enum UpdateMetadataErrorKind { - InvalidName, - NotFound, - TimedOut, - Other, - PublishMetadata, -} +pub type UpdateMetadataError = Error; #[derive(Clone, Copy, Debug, PartialEq)] pub enum InfoErrorKind { diff --git a/async-nats/tests/object_store.rs b/async-nats/tests/object_store.rs index 58c10f547..c18f49c83 100644 --- a/async-nats/tests/object_store.rs +++ b/async-nats/tests/object_store.rs @@ -15,7 +15,7 @@ mod object_store { use std::{io, time::Duration}; - use async_nats::jetstream::object_store::ObjectMeta; + use async_nats::jetstream::{object_store::ObjectMeta, stream::DirectGetErrorKind}; use base64::Engine; use futures::StreamExt; use rand::RngCore; @@ -359,22 +359,42 @@ mod object_store { .await .unwrap(); bucket - .put("DATA", &mut "some data".as_bytes()) + .put("old_object", &mut "some data".as_bytes()) .await .unwrap(); let given_metadata = ObjectMeta { - name: "data".to_owned(), + name: "new_object".to_owned(), description: Some("description".to_string()), link: None, }; bucket - .update_metadata("DATA", given_metadata.clone()) + .update_metadata("old_object", given_metadata.clone()) + .await + .unwrap(); + + let stream = jetstream.get_stream("OBJ_bucket").await.unwrap(); + + stream + .direct_get_last_for_subject(format!( + "$O.bucket.M.{}", + base64::engine::general_purpose::URL_SAFE.encode("new_object") + )) .await .unwrap(); - let info = bucket.info("DATA").await.unwrap(); + let old_meta_subject = stream + .direct_get_last_for_subject(format!( + "$O.bucket.M.{}", + base64::engine::general_purpose::URL_SAFE.encode("old_object") + )) + .await + .unwrap_err(); + + assert_eq!(old_meta_subject.kind(), DirectGetErrorKind::NotFound); + + let info = bucket.info("new_object").await.unwrap(); assert_eq!(info.name, given_metadata.name); assert_eq!(info.description, given_metadata.description); From 6d38be99a6b73ae3ee5cf8093bf7acc2dfe1af58 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Tue, 1 Aug 2023 09:34:42 +0200 Subject: [PATCH 09/43] Remove link from metadata update Signed-off-by: Tomasz Pietrek --- async-nats/src/jetstream/object_store/mod.rs | 7 +------ async-nats/tests/object_store.rs | 3 --- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/async-nats/src/jetstream/object_store/mod.rs b/async-nats/src/jetstream/object_store/mod.rs index 673547142..9d1b640df 100644 --- a/async-nats/src/jetstream/object_store/mod.rs +++ b/async-nats/src/jetstream/object_store/mod.rs @@ -321,7 +321,7 @@ impl ObjectStore { let object_info = ObjectInfo { name: object_meta.name, description: object_meta.description, - link: object_meta.link, + link: None, bucket: self.name.clone(), nuid: object_nuid, chunks: object_chunks, @@ -505,7 +505,6 @@ impl ObjectStore { /// object_store::ObjectMeta { /// name: "new_name".to_string(), /// description: Some("a new description".to_string()), - /// link: None, /// }, /// ) /// .await?; @@ -564,7 +563,6 @@ impl ObjectStore { info.name = metadata.name; info.description = metadata.description; - info.link = metadata.link; let name = encode_object_name(&info.name); let subject = format!("$O.{}.M.{}", &self.name, &name); @@ -850,8 +848,6 @@ pub struct ObjectMeta { pub name: String, /// A short human readable description of the object. pub description: Option, - /// Link this object points to, if any. - pub link: Option, } impl From<&str> for ObjectMeta { @@ -868,7 +864,6 @@ impl From for ObjectMeta { ObjectMeta { name: info.name, description: info.description, - link: info.link, } } } diff --git a/async-nats/tests/object_store.rs b/async-nats/tests/object_store.rs index c18f49c83..f4fcf74ba 100644 --- a/async-nats/tests/object_store.rs +++ b/async-nats/tests/object_store.rs @@ -282,7 +282,6 @@ mod object_store { ObjectMeta { name: "Foo".to_string(), description: Some("foo desc".to_string()), - ..Default::default() }, &mut "dadada".as_bytes(), ) @@ -366,7 +365,6 @@ mod object_store { let given_metadata = ObjectMeta { name: "new_object".to_owned(), description: Some("description".to_string()), - link: None, }; bucket @@ -398,6 +396,5 @@ mod object_store { assert_eq!(info.name, given_metadata.name); assert_eq!(info.description, given_metadata.description); - assert_eq!(info.link, given_metadata.link); } } From 83c933cc31344bf56e0fa545a75d366d9d5e5446 Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Wed, 2 Aug 2023 20:35:07 +0200 Subject: [PATCH 10/43] Fix rustls deprecation warning --- async-nats/src/tls.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async-nats/src/tls.rs b/async-nats/src/tls.rs index 6dc7b122b..c00f37c63 100644 --- a/async-nats/src/tls.rs +++ b/async-nats/src/tls.rs @@ -105,7 +105,7 @@ pub(crate) async fn config_tls(options: &ConnectorOptions) -> io::Result Date: Wed, 2 Aug 2023 20:32:29 +0200 Subject: [PATCH 11/43] Remove some allocations from benchmarks --- async-nats/benches/core_nats.rs | 37 +++++++++++++-------------------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/async-nats/benches/core_nats.rs b/async-nats/benches/core_nats.rs index ca3a275e0..dde283cad 100644 --- a/async-nats/benches/core_nats.rs +++ b/async-nats/benches/core_nats.rs @@ -1,27 +1,28 @@ +use bytes::Bytes; use criterion::{criterion_group, criterion_main, Criterion}; use futures::stream::StreamExt; +static MSG: &[u8] = &[22; 32768]; + pub fn publish(c: &mut Criterion) { let server = nats_server::run_basic_server(); let mut throughput_group = c.benchmark_group("async-nats: publish throughput"); throughput_group.sample_size(30); throughput_group.warm_up_time(std::time::Duration::from_secs(1)); - let bmsg: Vec = (0..32768).map(|_| 22).collect(); - for size in [32, 1024, 8192].iter() { - throughput_group.throughput(criterion::Throughput::Bytes(*size as u64 * 100)); + for &size in [32, 1024, 8192].iter() { + throughput_group.throughput(criterion::Throughput::Bytes(size as u64 * 100)); throughput_group.bench_with_input( criterion::BenchmarkId::from_parameter(size), - size, + &size, |b, _| { let rt = tokio::runtime::Runtime::new().unwrap(); let nc = rt.block_on(async { async_nats::connect(server.client_url()).await.unwrap() }); - let msg = &bmsg[0..*size]; b.to_async(rt).iter(move || { let nc = nc.clone(); - async move { publish_messages(nc, msg, 100).await } + async move { publish_messages(nc, Bytes::from_static(&MSG[..size]), 100).await } }); }, ); @@ -32,12 +33,11 @@ pub fn publish(c: &mut Criterion) { messages_group.sample_size(30); messages_group.warm_up_time(std::time::Duration::from_secs(1)); - let bmsg: Vec = (0..32768).map(|_| 22).collect(); - for size in [32, 1024, 8192].iter() { + for &size in [32, 1024, 8192].iter() { messages_group.throughput(criterion::Throughput::Elements(100)); messages_group.bench_with_input( criterion::BenchmarkId::from_parameter(size), - size, + &size, |b, _| { let rt = tokio::runtime::Runtime::new().unwrap(); let nc = rt.block_on(async { @@ -46,11 +46,10 @@ pub fn publish(c: &mut Criterion) { nc.flush().await.unwrap(); nc }); - let msg = &bmsg[0..*size]; b.to_async(rt).iter(move || { let nc = nc.clone(); - async move { publish_messages(nc, msg, 100).await } + async move { publish_messages(nc, Bytes::from_static(&MSG[..size]), 100).await } }); }, ); @@ -65,11 +64,11 @@ pub fn subscribe(c: &mut Criterion) { subscribe_amount_group.sample_size(30); subscribe_amount_group.warm_up_time(std::time::Duration::from_secs(1)); - for size in [32, 1024, 8192].iter() { + for &size in [32, 1024, 8192].iter() { subscribe_amount_group.throughput(criterion::Throughput::Elements(100)); subscribe_amount_group.bench_with_input( criterion::BenchmarkId::from_parameter(size), - size, + &size, |b, _| { let rt = tokio::runtime::Runtime::new().unwrap(); let nc = rt.block_on(async { @@ -78,11 +77,8 @@ pub fn subscribe(c: &mut Criterion) { tokio::task::spawn({ let nc = nc.clone(); async move { - let bmsg: Vec = (0..32768).map(|_| 22).collect(); - let msg = &bmsg[0..*size].to_vec(); - loop { - nc.publish("bench".to_string(), msg.clone().into()) + nc.publish("bench".to_string(), Bytes::from_static(&MSG[..size])) .await .unwrap(); } @@ -102,12 +98,9 @@ pub fn subscribe(c: &mut Criterion) { } subscribe_amount_group.finish(); } -async fn publish_messages(nc: async_nats::Client, msg: &'_ [u8], amount: usize) { - let msg = msg.to_vec(); +async fn publish_messages(nc: async_nats::Client, msg: Bytes, amount: usize) { for _i in 0..amount { - nc.publish("bench".into(), msg.clone().into()) - .await - .unwrap(); + nc.publish("bench".into(), msg.clone()).await.unwrap(); } nc.flush().await.unwrap(); } From ff7e1dd84b87ebdc8f4fabd18452e931c4bb2d58 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 2 Aug 2023 20:26:04 +0300 Subject: [PATCH 12/43] Introduce `get` and `get_all` semantics to `HeaderMap` --- async-nats/src/header.rs | 135 ++++++++++++++-------- async-nats/src/jetstream/consumer/pull.rs | 12 +- async-nats/src/jetstream/consumer/push.rs | 14 +-- async-nats/src/jetstream/kv/mod.rs | 35 ++---- async-nats/tests/jetstream_tests.rs | 20 ++-- async-nats/tests/service_tests.rs | 8 +- 6 files changed, 116 insertions(+), 108 deletions(-) diff --git a/async-nats/src/header.rs b/async-nats/src/header.rs index d0aa63906..b6eada26d 100644 --- a/async-nats/src/header.rs +++ b/async-nats/src/header.rs @@ -20,7 +20,7 @@ //! NATS [Message][crate::Message] headers, modeled loosely after the [http::header] crate. -use std::{collections::HashMap, fmt, slice, str::FromStr}; +use std::{collections::HashMap, fmt, slice::Iter, str::FromStr}; use bytes::Bytes; use serde::{Deserialize, Serialize}; @@ -46,7 +46,7 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, PartialEq, Eq, Debug, Default, Deserialize, Serialize)] pub struct HeaderMap { - inner: HashMap, + inner: HashMap>, } impl FromIterator<(HeaderName, HeaderValue)> for HeaderMap { @@ -60,11 +60,23 @@ impl FromIterator<(HeaderName, HeaderValue)> for HeaderMap { } impl HeaderMap { - pub fn iter(&self) -> std::collections::hash_map::Iter<'_, HeaderName, HeaderValue> { + pub fn iter(&self) -> std::collections::hash_map::Iter<'_, HeaderName, Vec> { self.inner.iter() } } +pub struct GetAll<'a, T> { + inner: Iter<'a, T>, +} + +impl<'a, T> Iterator for GetAll<'a, T> { + type Item = &'a T; + + fn next(&mut self) -> Option { + self.inner.next() + } +} + impl HeaderMap { /// Create an empty `HeaderMap`. /// @@ -113,7 +125,7 @@ impl HeaderMap { /// ``` pub fn insert(&mut self, name: K, value: V) { self.inner - .insert(name.into_header_name(), value.into_header_value()); + .insert(name.into_header_name(), vec![value.into_header_value()]); } /// Appends a new value to the list of values to a given key. @@ -127,13 +139,14 @@ impl HeaderMap { /// let mut headers = HeaderMap::new(); /// headers.append("Key", "Value"); /// headers.append("Key", "Another"); - /// ``` pub fn append(&mut self, name: K, value: V) { let key = name.into_header_name(); let v = self.inner.get_mut(&key); match v { Some(v) => { - v.value.push(value.to_string()); + v.push(HeaderValue { + inner: value.to_string(), + }); } None => { self.insert(key, value.to_string().into_header_value()); @@ -150,10 +163,36 @@ impl HeaderMap { /// /// let mut headers = HeaderMap::new(); /// headers.append("Key", "Value"); - /// let key = headers.get("Key").unwrap(); + /// let values = headers.get("Key").unwrap(); + /// ``` + pub fn get(&self, key: K) -> Option<&HeaderValue> { + self.inner + .get(&key.into_header_name()) + .and_then(|x| x.first()) + } + + /// Gets an iterator to the values for a given key. + /// + /// # Examples + /// + /// ``` + /// # use async_nats::HeaderMap; + /// + /// let mut headers = HeaderMap::new(); + /// headers.append("Key", "Value1"); + /// headers.append("Key", "Value2"); + /// let mut values = headers.get_all("Key"); + /// let value1 = values.next(); + /// let value2 = values.next(); /// ``` - pub fn get(&self, name: T) -> Option<&HeaderValue> { - self.inner.get(&name.into_header_name()) + pub fn get_all(&self, key: K) -> GetAll { + let inner = self + .inner + .get(&key.into_header_name()) + .map(|x| x.iter()) + .unwrap_or([].iter()); + + GetAll { inner } } pub(crate) fn to_bytes(&self) -> Vec { @@ -163,7 +202,7 @@ impl HeaderMap { for v in vs.iter() { buf.extend_from_slice(k.as_str().as_bytes()); buf.extend_from_slice(b": "); - buf.extend_from_slice(v.as_bytes()); + buf.extend_from_slice(v.inner.as_bytes()); buf.extend_from_slice(b"\r\n"); } } @@ -186,15 +225,12 @@ impl HeaderMap { /// ``` #[derive(Clone, PartialEq, Eq, Debug, Default, Serialize, Deserialize)] pub struct HeaderValue { - value: Vec, + inner: String, } impl ToString for HeaderValue { fn to_string(&self) -> String { - self.iter() - .next() - .cloned() - .unwrap_or_else(|| String::from("")) + self.inner.to_string() } } @@ -203,6 +239,7 @@ impl From for String { header.to_string() } } + impl From<&HeaderValue> for String { fn from(header: &HeaderValue) -> Self { header.to_string() @@ -211,11 +248,7 @@ impl From<&HeaderValue> for String { impl<'a> From<&'a HeaderValue> for &'a str { fn from(header: &'a HeaderValue) -> Self { - header - .iter() - .next() - .map(|v| v.as_str()) - .unwrap_or_else(|| "") + header.inner.as_str() } } @@ -227,46 +260,33 @@ impl FromStr for HeaderValue { return Err(ParseHeaderValueError); } - let mut set = HeaderValue::new(); - set.value.push(s.to_string()); - Ok(set) + Ok(HeaderValue { + inner: s.to_string(), + }) } } impl From for HeaderValue { fn from(v: u64) -> Self { - let mut set = HeaderValue::new(); - set.value.push(v.to_string()); - set + Self { + inner: v.to_string(), + } } } + impl From<&str> for HeaderValue { fn from(v: &str) -> Self { - let mut set = HeaderValue::new(); - set.value.push(v.to_string()); - set - } -} - -impl IntoIterator for HeaderValue { - type Item = String; - - type IntoIter = std::vec::IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.value.into_iter() + Self { + inner: v.to_string(), + } } } impl HeaderValue { - pub fn new() -> HeaderValue { + pub fn new() -> Self { HeaderValue::default() } - pub fn iter(&self) -> slice::Iter { - self.value.iter() - } - pub fn as_str(&self) -> &str { self.into() } @@ -307,11 +327,12 @@ impl IntoHeaderName for HeaderName { pub trait IntoHeaderValue { fn into_header_value(self) -> HeaderValue; } + impl IntoHeaderValue for &str { fn into_header_value(self) -> HeaderValue { - let mut set = HeaderValue::new(); - set.value.push(self.to_string()); - set + HeaderValue { + inner: self.to_string(), + } } } @@ -561,10 +582,19 @@ mod tests { headers.append("Key", "value"); headers.append("Key", "second_value"); + let mut result = headers.get_all("Key"); + assert_eq!( - headers.get("Key").unwrap().value, - Vec::from_iter(["value".to_string(), "second_value".to_string()]) + result.next().unwrap(), + &HeaderValue::from_str("value").unwrap() ); + + assert_eq!( + result.next().unwrap(), + &HeaderValue::from_str("second_value").unwrap() + ); + + assert_eq!(result.next(), None); } #[test] @@ -589,10 +619,13 @@ mod tests { let mut headers = HeaderMap::new(); headers.insert("Key", "Value"); + let mut result = headers.get_all("Key"); + assert_eq!( - headers.get("Key").unwrap().value, - Vec::from_iter(["Value".to_string()]) + result.next().unwrap(), + &HeaderValue::from_str("Value").unwrap() ); + assert_eq!(result.next(), None); } #[test] diff --git a/async-nats/src/jetstream/consumer/pull.rs b/async-nats/src/jetstream/consumer/pull.rs index 48c308eb6..02f3ac046 100644 --- a/async-nats/src/jetstream/consumer/pull.rs +++ b/async-nats/src/jetstream/consumer/pull.rs @@ -1128,24 +1128,20 @@ impl futures::Stream for Stream { .headers .as_ref() .and_then(|headers| headers.get("Nats-Pending-Messages")) - .map(|h| h.iter()) - .and_then(|mut i| i.next()) - .map(|e| e.parse::()) - .unwrap_or(Ok(self.batch_config.batch)) + .map_or(Ok(self.batch_config.batch), |x| x.as_str().parse()) .map_err(|err| { MessagesError::with_source(MessagesErrorKind::Other, err) })?; + let pending_bytes = message .headers .as_ref() .and_then(|headers| headers.get("Nats-Pending-Bytes")) - .map(|h| h.iter()) - .and_then(|mut i| i.next()) - .map(|e| e.parse::()) - .unwrap_or(Ok(self.batch_config.max_bytes)) + .map_or(Ok(self.batch_config.max_bytes), |x| x.as_str().parse()) .map_err(|err| { MessagesError::with_source(MessagesErrorKind::Other, err) })?; + debug!( "timeout reached. remaining messages: {}, bytes {}", pending_messages, pending_bytes diff --git a/async-nats/src/jetstream/consumer/push.rs b/async-nats/src/jetstream/consumer/push.rs index 10363a4f5..87a59a03f 100644 --- a/async-nats/src/jetstream/consumer/push.rs +++ b/async-nats/src/jetstream/consumer/push.rs @@ -657,14 +657,12 @@ impl<'a> futures::Stream for Ordered<'a> { headers.get(crate::header::NATS_LAST_CONSUMER) { let sequence: u64 = - sequence.iter().next().unwrap().parse().map_err( - |err| { - OrderedError::with_source( - OrderedErrorKind::Other, - err, - ) - }, - )?; + sequence.as_str().parse().map_err(|err| { + OrderedError::with_source( + OrderedErrorKind::Other, + err, + ) + })?; let last_sequence = self.consumer_sequence.load(Ordering::Relaxed); diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index 57fe3d7e4..cfb366565 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -56,12 +56,15 @@ fn kv_operation_from_message(message: &Message) -> Result .as_ref() .ok_or_else(|| EntryError::with_source(EntryErrorKind::Other, "missing headers"))?; - headers - .get(KV_OPERATION) - .map(|x| x.iter().next().unwrap().as_str()) - .unwrap_or(KV_OPERATION_PUT) - .parse() - .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err)) + if let Some(op) = headers.get(KV_OPERATION) { + Operation::from_str(op.as_str()) + .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err)) + } else { + Err(EntryError::with_source( + EntryErrorKind::Other, + "missing operation", + )) + } } static VALID_BUCKET_RE: Lazy = Lazy::new(|| Regex::new(r#"\A[a-zA-Z0-9_-]+\z"#).unwrap()); @@ -304,14 +307,7 @@ impl Store { "missing sequence headers", ) })? - .iter() - .next() - .ok_or_else(|| { - EntryError::with_source( - EntryErrorKind::Other, - "did not found sequence header value", - ) - })? + .as_str() .parse() .map_err(|err| { EntryError::with_source( @@ -319,6 +315,7 @@ impl Store { format!("failed to parse headers sequence value: {}", err), ) })?; + let created = headers .get(header::NATS_TIME_STAMP) .ok_or_else(|| { @@ -326,17 +323,9 @@ impl Store { EntryErrorKind::Other, "did not found timestamp header", ) - })? - .iter() - .next() - .ok_or_else(|| { - EntryError::with_source( - EntryErrorKind::Other, - "did not found timestamp header value", - ) }) .and_then(|created| { - OffsetDateTime::parse(created, &Rfc3339).map_err(|err| { + OffsetDateTime::parse(created.as_str(), &Rfc3339).map_err(|err| { EntryError::with_source( EntryErrorKind::Other, format!( diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index e5a7fa818..5797f7d7b 100644 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -695,9 +695,8 @@ mod jetstream { .unwrap() .get(header::NATS_SEQUENCE) .unwrap() - .iter() - .next() - .unwrap(); + .as_str(); + assert_eq!(sequence.parse::().unwrap(), publish_ack.sequence); assert_eq!(payload, message.payload.as_ref()); @@ -754,9 +753,8 @@ mod jetstream { .unwrap() .get(header::NATS_SEQUENCE) .unwrap() - .iter() - .next() - .unwrap(); + .as_str(); + assert_eq!(sequence.parse::().unwrap(), publish_ack.sequence); assert_eq!(payload, message.payload.as_ref()); @@ -828,9 +826,8 @@ mod jetstream { .unwrap() .get(header::NATS_SEQUENCE) .unwrap() - .iter() - .next() - .unwrap(); + .as_str(); + assert_eq!(sequence.parse::().unwrap(), publish_ack.sequence); assert_eq!(payload, message.payload.as_ref()); @@ -897,9 +894,8 @@ mod jetstream { .unwrap() .get(header::NATS_SEQUENCE) .unwrap() - .iter() - .next() - .unwrap(); + .as_str(); + assert_eq!(sequence.parse::().unwrap(), publish_ack.sequence); assert_eq!(payload, message.payload.as_ref()); diff --git a/async-nats/tests/service_tests.rs b/async-nats/tests/service_tests.rs index 89b992a63..94f0bf417 100644 --- a/async-nats/tests/service_tests.rs +++ b/async-nats/tests/service_tests.rs @@ -307,9 +307,7 @@ mod service { .unwrap() .get(async_nats::service::NATS_SERVICE_ERROR_CODE) .unwrap() - .iter() - .next() - .unwrap() + .as_str() .parse::() .unwrap(), 503 @@ -320,9 +318,7 @@ mod service { .unwrap() .get(async_nats::service::NATS_SERVICE_ERROR) .unwrap() - .iter() - .next() - .unwrap(), + .to_string(), "error".to_string() ); From c83badc5f1f15fa9bda50e84564321fc8810af45 Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Fri, 4 Aug 2023 19:55:00 +0800 Subject: [PATCH 13/43] Change `Header::append` value to `IntoHeaderValue` --- async-nats/src/connection.rs | 4 ++-- async-nats/src/header.rs | 8 +++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/async-nats/src/connection.rs b/async-nats/src/connection.rs index e1104d781..2790de2a1 100644 --- a/async-nats/src/connection.rs +++ b/async-nats/src/connection.rs @@ -22,7 +22,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite}; use bytes::{Buf, BytesMut}; use tokio::io; -use crate::header::{HeaderMap, HeaderName}; +use crate::header::{HeaderMap, HeaderName, IntoHeaderValue}; use crate::status::StatusCode; use crate::{ClientOp, ServerError, ServerOp}; @@ -304,7 +304,7 @@ impl Connection { } value.truncate(value.trim_end().len()); - headers.append(name, value); + headers.append(name, value.into_header_value()); } return Ok(Some(ServerOp::Message { diff --git a/async-nats/src/header.rs b/async-nats/src/header.rs index b6eada26d..e3ead49c2 100644 --- a/async-nats/src/header.rs +++ b/async-nats/src/header.rs @@ -139,17 +139,15 @@ impl HeaderMap { /// let mut headers = HeaderMap::new(); /// headers.append("Key", "Value"); /// headers.append("Key", "Another"); - pub fn append(&mut self, name: K, value: V) { + pub fn append(&mut self, name: K, value: V) { let key = name.into_header_name(); let v = self.inner.get_mut(&key); match v { Some(v) => { - v.push(HeaderValue { - inner: value.to_string(), - }); + v.push(value.into_header_value()); } None => { - self.insert(key, value.to_string().into_header_value()); + self.insert(key, value.into_header_value()); } } } From 345f999916cacd7a7a43822ab549701076a51a15 Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Fri, 4 Aug 2023 20:11:23 +0800 Subject: [PATCH 14/43] Fix header value documentation --- async-nats/src/header.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/async-nats/src/header.rs b/async-nats/src/header.rs index e3ead49c2..79a4ba8f8 100644 --- a/async-nats/src/header.rs +++ b/async-nats/src/header.rs @@ -209,8 +209,7 @@ impl HeaderMap { } } -/// A struct representing value of a given header. -/// Can contain one or more elements. +/// Represents NATS header field value. /// /// # Examples /// From 5771a77e0861cfdf00546701887b536c391e0e15 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Tue, 1 Aug 2023 12:43:07 +0200 Subject: [PATCH 15/43] Use bigger bench subscribe messages batch Signed-off-by: Tomasz Pietrek --- async-nats/benches/core_nats.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/async-nats/benches/core_nats.rs b/async-nats/benches/core_nats.rs index dde283cad..022bcb69b 100644 --- a/async-nats/benches/core_nats.rs +++ b/async-nats/benches/core_nats.rs @@ -59,13 +59,14 @@ pub fn publish(c: &mut Criterion) { pub fn subscribe(c: &mut Criterion) { let server = nats_server::run_basic_server(); + let messages_per_subscribe = 100_000; let mut subscribe_amount_group = c.benchmark_group("subscribe amount"); subscribe_amount_group.sample_size(30); subscribe_amount_group.warm_up_time(std::time::Duration::from_secs(1)); for &size in [32, 1024, 8192].iter() { - subscribe_amount_group.throughput(criterion::Throughput::Elements(100)); + subscribe_amount_group.throughput(criterion::Throughput::Elements(messages_per_subscribe)); subscribe_amount_group.bench_with_input( criterion::BenchmarkId::from_parameter(size), &size, @@ -91,7 +92,7 @@ pub fn subscribe(c: &mut Criterion) { b.to_async(rt).iter(move || { let nc = nc.clone(); - async move { subscribe_messages(nc, 100).await } + async move { subscribe_messages(nc, messages_per_subscribe).await } }); }, ); @@ -105,7 +106,7 @@ async fn publish_messages(nc: async_nats::Client, msg: Bytes, amount: usize) { nc.flush().await.unwrap(); } -async fn subscribe_messages(nc: async_nats::Client, amount: usize) { +async fn subscribe_messages(nc: async_nats::Client, amount: u64) { let mut sub = nc.subscribe("bench".into()).await.unwrap(); for _ in 0..amount { sub.next().await.unwrap(); From bd3e3d371111d69ff5c68075d22a7500eec7e264 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Tue, 1 Aug 2023 19:53:08 +0200 Subject: [PATCH 16/43] Improve subscribe benchmark Signed-off-by: Tomasz Pietrek --- async-nats/benches/core_nats.rs | 37 +++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/async-nats/benches/core_nats.rs b/async-nats/benches/core_nats.rs index 022bcb69b..d4393333f 100644 --- a/async-nats/benches/core_nats.rs +++ b/async-nats/benches/core_nats.rs @@ -59,27 +59,40 @@ pub fn publish(c: &mut Criterion) { pub fn subscribe(c: &mut Criterion) { let server = nats_server::run_basic_server(); - let messages_per_subscribe = 100_000; + let messages_per_subscribe = 1_000_000; let mut subscribe_amount_group = c.benchmark_group("subscribe amount"); - subscribe_amount_group.sample_size(30); - subscribe_amount_group.warm_up_time(std::time::Duration::from_secs(1)); + subscribe_amount_group.sample_size(10); for &size in [32, 1024, 8192].iter() { + let url = server.client_url(); subscribe_amount_group.throughput(criterion::Throughput::Elements(messages_per_subscribe)); subscribe_amount_group.bench_with_input( criterion::BenchmarkId::from_parameter(size), &size, - |b, _| { + move |b, _| { let rt = tokio::runtime::Runtime::new().unwrap(); - let nc = rt.block_on(async { - let nc = async_nats::connect(server.client_url()).await.unwrap(); - - tokio::task::spawn({ - let nc = nc.clone(); + let url = url.clone(); + let (nc, handle) = rt.block_on(async move { + let nc = async_nats::ConnectOptions::new() + .connect(url.clone()) + .await + .unwrap(); + let (started, ready) = tokio::sync::oneshot::channel(); + let handle = tokio::task::spawn({ async move { + let client = async_nats::ConnectOptions::new() + .connect(url) + .await + .unwrap(); + + let bmsg: Vec = (0..32768).map(|_| 22).collect(); + let msg = &bmsg[0..*size].to_vec(); + + started.send(()).unwrap(); loop { - nc.publish("bench".to_string(), Bytes::from_static(&MSG[..size])) + client + .publish("bench".to_string(), Bytes::from_static(&MSG[..size])) .await .unwrap(); } @@ -87,13 +100,15 @@ pub fn subscribe(c: &mut Criterion) { }); nc.publish("data".to_string(), "data".into()).await.unwrap(); nc.flush().await.unwrap(); - nc + ready.await.unwrap(); + (nc, handle) }); b.to_async(rt).iter(move || { let nc = nc.clone(); async move { subscribe_messages(nc, messages_per_subscribe).await } }); + handle.abort(); }, ); } From 9c57c776cf8c9716ff54f18d2e96c8414f80795f Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Fri, 4 Aug 2023 09:19:49 +0200 Subject: [PATCH 17/43] Adjust benchmark elements per iteration Signed-off-by: Tomasz Pietrek --- async-nats/benches/core_nats.rs | 95 ++++++++++++++++++++++++++++----- 1 file changed, 83 insertions(+), 12 deletions(-) diff --git a/async-nats/benches/core_nats.rs b/async-nats/benches/core_nats.rs index d4393333f..9c4e83c14 100644 --- a/async-nats/benches/core_nats.rs +++ b/async-nats/benches/core_nats.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use bytes::Bytes; use criterion::{criterion_group, criterion_main, Criterion}; use futures::stream::StreamExt; @@ -5,13 +7,14 @@ use futures::stream::StreamExt; static MSG: &[u8] = &[22; 32768]; pub fn publish(c: &mut Criterion) { + let messages_amount = 500_000; let server = nats_server::run_basic_server(); let mut throughput_group = c.benchmark_group("async-nats: publish throughput"); - throughput_group.sample_size(30); + throughput_group.sample_size(10); throughput_group.warm_up_time(std::time::Duration::from_secs(1)); for &size in [32, 1024, 8192].iter() { - throughput_group.throughput(criterion::Throughput::Bytes(size as u64 * 100)); + throughput_group.throughput(criterion::Throughput::Bytes(size as u64 * messages_amount)); throughput_group.bench_with_input( criterion::BenchmarkId::from_parameter(size), &size, @@ -22,7 +25,10 @@ pub fn publish(c: &mut Criterion) { b.to_async(rt).iter(move || { let nc = nc.clone(); - async move { publish_messages(nc, Bytes::from_static(&MSG[..size]), 100).await } + async move { + publish_messages(nc, Bytes::from_static(&MSG[..size]), messages_amount) + .await + } }); }, ); @@ -30,11 +36,11 @@ pub fn publish(c: &mut Criterion) { throughput_group.finish(); let mut messages_group = c.benchmark_group("async-nats: publish messages amount"); - messages_group.sample_size(30); + messages_group.sample_size(10); messages_group.warm_up_time(std::time::Duration::from_secs(1)); for &size in [32, 1024, 8192].iter() { - messages_group.throughput(criterion::Throughput::Elements(100)); + messages_group.throughput(criterion::Throughput::Elements(messages_amount)); messages_group.bench_with_input( criterion::BenchmarkId::from_parameter(size), &size, @@ -49,7 +55,10 @@ pub fn publish(c: &mut Criterion) { b.to_async(rt).iter(move || { let nc = nc.clone(); - async move { publish_messages(nc, Bytes::from_static(&MSG[..size]), 100).await } + async move { + publish_messages(nc, Bytes::from_static(&MSG[..size]), messages_amount) + .await + } }); }, ); @@ -59,7 +68,7 @@ pub fn publish(c: &mut Criterion) { pub fn subscribe(c: &mut Criterion) { let server = nats_server::run_basic_server(); - let messages_per_subscribe = 1_000_000; + let messages_per_subscribe = 500_000; let mut subscribe_amount_group = c.benchmark_group("subscribe amount"); subscribe_amount_group.sample_size(10); @@ -86,15 +95,12 @@ pub fn subscribe(c: &mut Criterion) { .await .unwrap(); - let bmsg: Vec = (0..32768).map(|_| 22).collect(); - let msg = &bmsg[0..*size].to_vec(); - started.send(()).unwrap(); loop { client .publish("bench".to_string(), Bytes::from_static(&MSG[..size])) .await - .unwrap(); + .ok(); } } }); @@ -114,7 +120,72 @@ pub fn subscribe(c: &mut Criterion) { } subscribe_amount_group.finish(); } -async fn publish_messages(nc: async_nats::Client, msg: Bytes, amount: usize) { + +pub fn request(c: &mut Criterion) { + let server = nats_server::run_basic_server(); + let messages_per_run = 10_000; + + let mut subscribe_amount_group = c.benchmark_group("request amount"); + subscribe_amount_group.sample_size(10); + + for &size in [32, 1024, 8192].iter() { + let url = server.client_url(); + subscribe_amount_group.throughput(criterion::Throughput::Elements(messages_per_run)); + subscribe_amount_group.bench_with_input( + criterion::BenchmarkId::from_parameter(size), + &size, + move |b, _| { + let rt = tokio::runtime::Runtime::new().unwrap(); + let url = url.clone(); + let (nc, handle) = rt.block_on(async move { + let nc = async_nats::ConnectOptions::new() + .connect(url.clone()) + .await + .unwrap(); + let (started, ready) = tokio::sync::oneshot::channel(); + let handle = tokio::task::spawn({ + async move { + let client = async_nats::ConnectOptions::new() + .connect(url) + .await + .unwrap(); + + let mut subscription = client.subscribe("bench".into()).await.unwrap(); + tokio::time::sleep(Duration::from_secs(3)).await; + started.send(()).unwrap(); + + while let Some(request) = subscription.next().await { + client + .publish(request.reply.unwrap(), "".into()) + .await + .unwrap(); + client.flush().await.unwrap(); + } + } + }); + nc.flush().await.unwrap(); + ready.await.unwrap(); + (nc, handle) + }); + b.to_async(rt).iter(move || { + let nc = nc.clone(); + async move { requests(nc, Bytes::from_static(&MSG[..size]), messages_per_run).await } + }); + handle.abort(); + }, + ); + } + subscribe_amount_group.finish(); +} + +async fn requests(nc: async_nats::Client, msg: Bytes, amount: u64) { + for _i in 0..amount { + nc.request("bench".into(), msg.clone()).await.unwrap(); + } + nc.flush().await.unwrap(); +} + +async fn publish_messages(nc: async_nats::Client, msg: Bytes, amount: u64) { for _i in 0..amount { nc.publish("bench".into(), msg.clone()).await.unwrap(); } From f290282386da836a8c7801c3f6ed00d635c8d820 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Fri, 4 Aug 2023 09:41:41 +0200 Subject: [PATCH 18/43] Add request benchmark Signed-off-by: Tomasz Pietrek --- async-nats/benches/core_nats.rs | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/async-nats/benches/core_nats.rs b/async-nats/benches/core_nats.rs index 9c4e83c14..451255dfe 100644 --- a/async-nats/benches/core_nats.rs +++ b/async-nats/benches/core_nats.rs @@ -7,14 +7,16 @@ use futures::stream::StreamExt; static MSG: &[u8] = &[22; 32768]; pub fn publish(c: &mut Criterion) { - let messages_amount = 500_000; + let messages_per_iter = 500_000; let server = nats_server::run_basic_server(); let mut throughput_group = c.benchmark_group("async-nats: publish throughput"); throughput_group.sample_size(10); throughput_group.warm_up_time(std::time::Duration::from_secs(1)); for &size in [32, 1024, 8192].iter() { - throughput_group.throughput(criterion::Throughput::Bytes(size as u64 * messages_amount)); + throughput_group.throughput(criterion::Throughput::Bytes( + size as u64 * messages_per_iter, + )); throughput_group.bench_with_input( criterion::BenchmarkId::from_parameter(size), &size, @@ -26,7 +28,7 @@ pub fn publish(c: &mut Criterion) { b.to_async(rt).iter(move || { let nc = nc.clone(); async move { - publish_messages(nc, Bytes::from_static(&MSG[..size]), messages_amount) + publish_messages(nc, Bytes::from_static(&MSG[..size]), messages_per_iter) .await } }); @@ -40,7 +42,7 @@ pub fn publish(c: &mut Criterion) { messages_group.warm_up_time(std::time::Duration::from_secs(1)); for &size in [32, 1024, 8192].iter() { - messages_group.throughput(criterion::Throughput::Elements(messages_amount)); + messages_group.throughput(criterion::Throughput::Elements(messages_per_iter)); messages_group.bench_with_input( criterion::BenchmarkId::from_parameter(size), &size, @@ -56,7 +58,7 @@ pub fn publish(c: &mut Criterion) { b.to_async(rt).iter(move || { let nc = nc.clone(); async move { - publish_messages(nc, Bytes::from_static(&MSG[..size]), messages_amount) + publish_messages(nc, Bytes::from_static(&MSG[..size]), messages_per_iter) .await } }); @@ -68,14 +70,14 @@ pub fn publish(c: &mut Criterion) { pub fn subscribe(c: &mut Criterion) { let server = nats_server::run_basic_server(); - let messages_per_subscribe = 500_000; + let messages_per_iter = 500_000; let mut subscribe_amount_group = c.benchmark_group("subscribe amount"); subscribe_amount_group.sample_size(10); for &size in [32, 1024, 8192].iter() { let url = server.client_url(); - subscribe_amount_group.throughput(criterion::Throughput::Elements(messages_per_subscribe)); + subscribe_amount_group.throughput(criterion::Throughput::Elements(messages_per_iter)); subscribe_amount_group.bench_with_input( criterion::BenchmarkId::from_parameter(size), &size, @@ -112,7 +114,7 @@ pub fn subscribe(c: &mut Criterion) { b.to_async(rt).iter(move || { let nc = nc.clone(); - async move { subscribe_messages(nc, messages_per_subscribe).await } + async move { subscribe_messages(nc, messages_per_iter).await } }); handle.abort(); }, @@ -123,14 +125,14 @@ pub fn subscribe(c: &mut Criterion) { pub fn request(c: &mut Criterion) { let server = nats_server::run_basic_server(); - let messages_per_run = 10_000; + let messages_per_iter = 10_000; let mut subscribe_amount_group = c.benchmark_group("request amount"); subscribe_amount_group.sample_size(10); for &size in [32, 1024, 8192].iter() { let url = server.client_url(); - subscribe_amount_group.throughput(criterion::Throughput::Elements(messages_per_run)); + subscribe_amount_group.throughput(criterion::Throughput::Elements(messages_per_iter)); subscribe_amount_group.bench_with_input( criterion::BenchmarkId::from_parameter(size), &size, @@ -169,7 +171,9 @@ pub fn request(c: &mut Criterion) { }); b.to_async(rt).iter(move || { let nc = nc.clone(); - async move { requests(nc, Bytes::from_static(&MSG[..size]), messages_per_run).await } + async move { + requests(nc, Bytes::from_static(&MSG[..size]), messages_per_iter).await + } }); handle.abort(); }, @@ -182,7 +186,6 @@ async fn requests(nc: async_nats::Client, msg: Bytes, amount: u64) { for _i in 0..amount { nc.request("bench".into(), msg.clone()).await.unwrap(); } - nc.flush().await.unwrap(); } async fn publish_messages(nc: async_nats::Client, msg: Bytes, amount: u64) { @@ -199,5 +202,5 @@ async fn subscribe_messages(nc: async_nats::Client, amount: u64) { } } -criterion_group!(benches, publish, subscribe); +criterion_group!(benches, publish, subscribe, request); criterion_main!(benches); From 556c69c3da2cd1ce4f8795434a0d78b5544c4478 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Fri, 4 Aug 2023 15:39:31 +0200 Subject: [PATCH 19/43] Use large drop iter in benchmark Signed-off-by: Tomasz Pietrek --- async-nats/benches/core_nats.rs | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/async-nats/benches/core_nats.rs b/async-nats/benches/core_nats.rs index 451255dfe..757c8df4a 100644 --- a/async-nats/benches/core_nats.rs +++ b/async-nats/benches/core_nats.rs @@ -25,7 +25,7 @@ pub fn publish(c: &mut Criterion) { let nc = rt.block_on(async { async_nats::connect(server.client_url()).await.unwrap() }); - b.to_async(rt).iter(move || { + b.to_async(rt).iter_with_large_drop(move || { let nc = nc.clone(); async move { publish_messages(nc, Bytes::from_static(&MSG[..size]), messages_per_iter) @@ -55,7 +55,7 @@ pub fn publish(c: &mut Criterion) { nc }); - b.to_async(rt).iter(move || { + b.to_async(rt).iter_with_large_drop(move || { let nc = nc.clone(); async move { publish_messages(nc, Bytes::from_static(&MSG[..size]), messages_per_iter) @@ -84,13 +84,13 @@ pub fn subscribe(c: &mut Criterion) { move |b, _| { let rt = tokio::runtime::Runtime::new().unwrap(); let url = url.clone(); - let (nc, handle) = rt.block_on(async move { + let nc = rt.block_on(async move { let nc = async_nats::ConnectOptions::new() .connect(url.clone()) .await .unwrap(); let (started, ready) = tokio::sync::oneshot::channel(); - let handle = tokio::task::spawn({ + tokio::task::spawn({ async move { let client = async_nats::ConnectOptions::new() .connect(url) @@ -102,21 +102,20 @@ pub fn subscribe(c: &mut Criterion) { client .publish("bench".to_string(), Bytes::from_static(&MSG[..size])) .await - .ok(); + .unwrap() } } }); nc.publish("data".to_string(), "data".into()).await.unwrap(); nc.flush().await.unwrap(); ready.await.unwrap(); - (nc, handle) + nc }); - b.to_async(rt).iter(move || { + b.to_async(rt).iter_with_large_drop(move || { let nc = nc.clone(); async move { subscribe_messages(nc, messages_per_iter).await } }); - handle.abort(); }, ); } @@ -139,13 +138,13 @@ pub fn request(c: &mut Criterion) { move |b, _| { let rt = tokio::runtime::Runtime::new().unwrap(); let url = url.clone(); - let (nc, handle) = rt.block_on(async move { + let nc = rt.block_on(async move { let nc = async_nats::ConnectOptions::new() .connect(url.clone()) .await .unwrap(); let (started, ready) = tokio::sync::oneshot::channel(); - let handle = tokio::task::spawn({ + tokio::task::spawn({ async move { let client = async_nats::ConnectOptions::new() .connect(url) @@ -167,15 +166,14 @@ pub fn request(c: &mut Criterion) { }); nc.flush().await.unwrap(); ready.await.unwrap(); - (nc, handle) + nc }); - b.to_async(rt).iter(move || { + b.to_async(rt).iter_with_large_drop(move || { let nc = nc.clone(); async move { requests(nc, Bytes::from_static(&MSG[..size]), messages_per_iter).await } }); - handle.abort(); }, ); } From 7f25ea6a6640afe1030d3075e70f536b81e21e7e Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Fri, 4 Aug 2023 22:48:46 +0200 Subject: [PATCH 20/43] Add JetStream publish bench Signed-off-by: Tomasz Pietrek --- async-nats/Cargo.toml | 2 +- async-nats/benches/core_nats.rs | 5 +- async-nats/benches/jetstream.rs | 215 ++++++++++++++++++++++++++++++++ async-nats/benches/main.rs | 7 ++ 4 files changed, 225 insertions(+), 4 deletions(-) create mode 100644 async-nats/benches/jetstream.rs create mode 100644 async-nats/benches/main.rs diff --git a/async-nats/Cargo.toml b/async-nats/Cargo.toml index 2867e7496..7f8bb240a 100644 --- a/async-nats/Cargo.toml +++ b/async-nats/Cargo.toml @@ -58,6 +58,6 @@ slow_tests = [] [[bench]] -name = "core_nats" +name = "main" harness = false lto = true diff --git a/async-nats/benches/core_nats.rs b/async-nats/benches/core_nats.rs index 757c8df4a..ffe368c23 100644 --- a/async-nats/benches/core_nats.rs +++ b/async-nats/benches/core_nats.rs @@ -1,7 +1,7 @@ use std::time::Duration; use bytes::Bytes; -use criterion::{criterion_group, criterion_main, Criterion}; +use criterion::{criterion_group, Criterion}; use futures::stream::StreamExt; static MSG: &[u8] = &[22; 32768]; @@ -200,5 +200,4 @@ async fn subscribe_messages(nc: async_nats::Client, amount: u64) { } } -criterion_group!(benches, publish, subscribe, request); -criterion_main!(benches); +criterion_group!(core_nats, publish, subscribe, request); diff --git a/async-nats/benches/jetstream.rs b/async-nats/benches/jetstream.rs new file mode 100644 index 000000000..12695d6a2 --- /dev/null +++ b/async-nats/benches/jetstream.rs @@ -0,0 +1,215 @@ +use std::future::IntoFuture; + +use async_nats::jetstream::stream; +use bytes::Bytes; +use criterion::{criterion_group, Criterion}; + +static MSG: &[u8] = &[22; 32768]; + +pub fn jetstream_publish_sync(c: &mut Criterion) { + let messages_per_iter = 50_000; + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let mut throughput_group = c.benchmark_group("jetstream sync publish throughput"); + throughput_group.sample_size(10); + throughput_group.warm_up_time(std::time::Duration::from_secs(1)); + + for &size in [32, 1024, 8192].iter() { + throughput_group.throughput(criterion::Throughput::Bytes( + size as u64 * messages_per_iter, + )); + throughput_group.bench_with_input( + criterion::BenchmarkId::from_parameter(size), + &size, + |b, _| { + let rt = tokio::runtime::Runtime::new().unwrap(); + let context = rt.block_on(async { + let context = async_nats::jetstream::new( + async_nats::connect(server.client_url()).await.unwrap(), + ); + + let stream = context + .create_stream(stream::Config { + name: "bench".to_owned(), + subjects: vec!["bench".to_string()], + ..Default::default() + }) + .await + .unwrap(); + stream.purge().await.unwrap(); + context + }); + + b.to_async(rt).iter_with_large_drop(move || { + let nc = context.clone(); + async move { + publish_sync_batch(nc, Bytes::from_static(&MSG[..size]), messages_per_iter) + .await + } + }); + }, + ); + } + throughput_group.finish(); + + let mut messages_group = c.benchmark_group("jetstream sync publish messages amount"); + messages_group.sample_size(10); + messages_group.warm_up_time(std::time::Duration::from_secs(1)); + + for &size in [32, 1024, 8192].iter() { + messages_group.throughput(criterion::Throughput::Elements(messages_per_iter)); + messages_group.bench_with_input( + criterion::BenchmarkId::from_parameter(size), + &size, + |b, _| { + let rt = tokio::runtime::Runtime::new().unwrap(); + let context = rt.block_on(async { + let context = async_nats::jetstream::new( + async_nats::connect(server.client_url()).await.unwrap(), + ); + + let stream = context + .create_stream(stream::Config { + name: "bench".to_owned(), + subjects: vec!["bench".to_string()], + ..Default::default() + }) + .await + .unwrap(); + stream.purge().await.unwrap(); + context + }); + + b.to_async(rt).iter_with_large_drop(move || { + let context = context.clone(); + async move { + publish_sync_batch( + context, + Bytes::from_static(&MSG[..size]), + messages_per_iter, + ) + .await + } + }); + }, + ); + } + messages_group.finish(); +} + +pub fn jetstream_publish_async(c: &mut Criterion) { + let messages_per_iter = 50_000; + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let mut throughput_group = c.benchmark_group("jetstream async publish throughput"); + throughput_group.sample_size(10); + throughput_group.warm_up_time(std::time::Duration::from_secs(1)); + + for &size in [32, 1024, 8192].iter() { + throughput_group.throughput(criterion::Throughput::Bytes( + size as u64 * messages_per_iter, + )); + throughput_group.bench_with_input( + criterion::BenchmarkId::from_parameter(size), + &size, + |b, _| { + let rt = tokio::runtime::Runtime::new().unwrap(); + let context = rt.block_on(async { + let context = async_nats::jetstream::new( + async_nats::connect(server.client_url()).await.unwrap(), + ); + + let stream = context + .create_stream(stream::Config { + name: "bench".to_owned(), + subjects: vec!["bench".to_string()], + ..Default::default() + }) + .await + .unwrap(); + stream.purge().await.unwrap(); + context + }); + + b.to_async(rt).iter_with_large_drop(move || { + let nc = context.clone(); + async move { + publish_async_batch(nc, Bytes::from_static(&MSG[..size]), messages_per_iter) + .await + } + }); + }, + ); + } + throughput_group.finish(); + + let mut messages_group = c.benchmark_group("jetstream async publish messages amount"); + messages_group.sample_size(10); + messages_group.warm_up_time(std::time::Duration::from_secs(1)); + + for &size in [32, 1024, 8192].iter() { + messages_group.throughput(criterion::Throughput::Elements(messages_per_iter)); + messages_group.bench_with_input( + criterion::BenchmarkId::from_parameter(size), + &size, + |b, _| { + let rt = tokio::runtime::Runtime::new().unwrap(); + let context = rt.block_on(async { + let context = async_nats::jetstream::new( + async_nats::connect(server.client_url()).await.unwrap(), + ); + + let stream = context + .create_stream(stream::Config { + name: "bench".to_owned(), + subjects: vec!["bench".to_string()], + ..Default::default() + }) + .await + .unwrap(); + stream.purge().await.unwrap(); + context + }); + + b.to_async(rt).iter_with_large_drop(move || { + let context = context.clone(); + async move { + publish_async_batch( + context, + Bytes::from_static(&MSG[..size]), + messages_per_iter, + ) + .await + } + }); + }, + ); + } + messages_group.finish(); +} +async fn publish_sync_batch(context: async_nats::jetstream::Context, msg: Bytes, amount: u64) { + for _i in 0..amount { + context + .publish("bench".into(), msg.clone()) + .await + .unwrap() + .await + .unwrap(); + } +} + +async fn publish_async_batch(context: async_nats::jetstream::Context, msg: Bytes, amount: u64) { + // This acts as a semaphore that does not allow for more than 1000 publish acks awaiting. + let (tx, mut rx) = tokio::sync::mpsc::channel(10); + + let handle = tokio::task::spawn(async move { + for _ in 0..amount { + rx.recv().await.unwrap(); + } + }); + for _ in 0..amount { + let ack = context.publish("bench".into(), msg.clone()).await.unwrap(); + tx.send(ack.into_future()).await.unwrap(); + } + handle.await.unwrap(); +} + +criterion_group!(jetstream, jetstream_publish_sync, jetstream_publish_async); diff --git a/async-nats/benches/main.rs b/async-nats/benches/main.rs new file mode 100644 index 000000000..18fba05d3 --- /dev/null +++ b/async-nats/benches/main.rs @@ -0,0 +1,7 @@ +use criterion::criterion_main; + +// Import the benchmark groups from both files +mod core_nats; +mod jetstream; + +criterion_main!(core_nats::core_nats, jetstream::jetstream); From a59b21eabceef45736f6f7ca987987e8c3edb805 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Sat, 5 Aug 2023 07:36:17 +0200 Subject: [PATCH 21/43] Rename benchmarks This was done to allow running a specific bench. Now `cargo bench nats` or event `cargo bench nats::request` can be used. While it does fake the module structure, it is really intuitive to use and prints easy to read benchmark report. Signed-off-by: Tomasz Pietrek --- async-nats/benches/core_nats.rs | 8 ++++---- async-nats/benches/jetstream.rs | 7 ++++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/async-nats/benches/core_nats.rs b/async-nats/benches/core_nats.rs index ffe368c23..f9dc9f99a 100644 --- a/async-nats/benches/core_nats.rs +++ b/async-nats/benches/core_nats.rs @@ -9,7 +9,7 @@ static MSG: &[u8] = &[22; 32768]; pub fn publish(c: &mut Criterion) { let messages_per_iter = 500_000; let server = nats_server::run_basic_server(); - let mut throughput_group = c.benchmark_group("async-nats: publish throughput"); + let mut throughput_group = c.benchmark_group("nats::publish_throughput"); throughput_group.sample_size(10); throughput_group.warm_up_time(std::time::Duration::from_secs(1)); @@ -37,7 +37,7 @@ pub fn publish(c: &mut Criterion) { } throughput_group.finish(); - let mut messages_group = c.benchmark_group("async-nats: publish messages amount"); + let mut messages_group = c.benchmark_group("nats::publish_amount"); messages_group.sample_size(10); messages_group.warm_up_time(std::time::Duration::from_secs(1)); @@ -72,7 +72,7 @@ pub fn subscribe(c: &mut Criterion) { let server = nats_server::run_basic_server(); let messages_per_iter = 500_000; - let mut subscribe_amount_group = c.benchmark_group("subscribe amount"); + let mut subscribe_amount_group = c.benchmark_group("nats::subscribe_amount"); subscribe_amount_group.sample_size(10); for &size in [32, 1024, 8192].iter() { @@ -126,7 +126,7 @@ pub fn request(c: &mut Criterion) { let server = nats_server::run_basic_server(); let messages_per_iter = 10_000; - let mut subscribe_amount_group = c.benchmark_group("request amount"); + let mut subscribe_amount_group = c.benchmark_group("nats::request_amount"); subscribe_amount_group.sample_size(10); for &size in [32, 1024, 8192].iter() { diff --git a/async-nats/benches/jetstream.rs b/async-nats/benches/jetstream.rs index 12695d6a2..fc5fea139 100644 --- a/async-nats/benches/jetstream.rs +++ b/async-nats/benches/jetstream.rs @@ -9,7 +9,7 @@ static MSG: &[u8] = &[22; 32768]; pub fn jetstream_publish_sync(c: &mut Criterion) { let messages_per_iter = 50_000; let server = nats_server::run_server("tests/configs/jetstream.conf"); - let mut throughput_group = c.benchmark_group("jetstream sync publish throughput"); + let mut throughput_group = c.benchmark_group("jetstream::sync_publish_throughput"); throughput_group.sample_size(10); throughput_group.warm_up_time(std::time::Duration::from_secs(1)); @@ -141,7 +141,8 @@ pub fn jetstream_publish_async(c: &mut Criterion) { } throughput_group.finish(); - let mut messages_group = c.benchmark_group("jetstream async publish messages amount"); + let mut messages_group = c.benchmark_group("jetstream::async_publish_messages_amount"); + messages_group.sample_size(10); messages_group.warm_up_time(std::time::Duration::from_secs(1)); @@ -197,7 +198,7 @@ async fn publish_sync_batch(context: async_nats::jetstream::Context, msg: Bytes, } async fn publish_async_batch(context: async_nats::jetstream::Context, msg: Bytes, amount: u64) { - // This acts as a semaphore that does not allow for more than 1000 publish acks awaiting. + // This acts as a semaphore that does not allow for more than 10 publish acks awaiting. let (tx, mut rx) = tokio::sync::mpsc::channel(10); let handle = tokio::task::spawn(async move { From b26262bcf3f9c47dcebeb0c2cf708f412d49f570 Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Fri, 11 Aug 2023 22:25:59 +0800 Subject: [PATCH 22/43] Implement `fmt::Display` for `HeaderValue` --- async-nats/src/header.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/async-nats/src/header.rs b/async-nats/src/header.rs index 79a4ba8f8..b70af3637 100644 --- a/async-nats/src/header.rs +++ b/async-nats/src/header.rs @@ -225,9 +225,9 @@ pub struct HeaderValue { inner: String, } -impl ToString for HeaderValue { - fn to_string(&self) -> String { - self.inner.to_string() +impl fmt::Display for HeaderValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.as_str(), f) } } From 0a80d9fd93b19f6fee6796b77c11d3b84dc782d3 Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Fri, 11 Aug 2023 09:20:55 +0200 Subject: [PATCH 23/43] Update nuid crate to 0.5 --- async-nats/Cargo.toml | 2 +- async-nats/src/jetstream/object_store/mod.rs | 2 +- async-nats/src/service/mod.rs | 2 +- nats-server/Cargo.toml | 2 +- nats-server/src/lib.rs | 6 +++--- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/async-nats/Cargo.toml b/async-nats/Cargo.toml index 7f8bb240a..785f43e94 100644 --- a/async-nats/Cargo.toml +++ b/async-nats/Cargo.toml @@ -29,7 +29,7 @@ itoa = "1" url = { version = "2"} tokio-rustls = "0.24" rustls-pemfile = "1.0.2" -nuid = "0.4.1" +nuid = "0.5" serde_nanos = "0.1.3" time = { version = "0.3.24", features = ["parsing", "formatting", "serde", "serde-well-known"] } rustls-native-certs = "0.6" diff --git a/async-nats/src/jetstream/object_store/mod.rs b/async-nats/src/jetstream/object_store/mod.rs index 9d1b640df..20966b1a8 100644 --- a/async-nats/src/jetstream/object_store/mod.rs +++ b/async-nats/src/jetstream/object_store/mod.rs @@ -323,7 +323,7 @@ impl ObjectStore { description: object_meta.description, link: None, bucket: self.name.clone(), - nuid: object_nuid, + nuid: object_nuid.to_string(), chunks: object_chunks, size: object_size, digest: Some(format!("SHA-256={}", URL_SAFE.encode(digest))), diff --git a/async-nats/src/service/mod.rs b/async-nats/src/service/mod.rs index 9bec665a8..6cc604861 100644 --- a/async-nats/src/service/mod.rs +++ b/async-nats/src/service/mod.rs @@ -318,7 +318,7 @@ impl Service { "service name is not a valid string (only A-Z, a-z, 0-9, _, - are allowed)", ))); } - let id = nuid::next(); + let id = nuid::next().to_string(); let started = time::OffsetDateTime::now_utc(); let subjects = Arc::new(Mutex::new(Vec::new())); let info = Info { diff --git a/nats-server/Cargo.toml b/nats-server/Cargo.toml index 14099dbf4..b665ff67c 100644 --- a/nats-server/Cargo.toml +++ b/nats-server/Cargo.toml @@ -11,7 +11,7 @@ lazy_static = "1.4.0" regex = { version = "1.7.1", default-features = false, features = ["std", "unicode-perl"] } url = "2" json = "0.12" -nuid = "0.4.1" +nuid = "0.5" rand = "0.8" tokio-retry = "0.3.0" diff --git a/nats-server/src/lib.rs b/nats-server/src/lib.rs index 2a9f4cca4..03afc0e80 100644 --- a/nats-server/src/lib.rs +++ b/nats-server/src/lib.rs @@ -240,7 +240,7 @@ pub fn run_server_with_port(cfg: &str, port: Option<&str>) -> Server { } fn do_run(cfg: &str, port: Option<&str>, id: Option) -> Inner { - let id = id.unwrap_or_else(nuid::next); + let id = id.unwrap_or_else(|| nuid::next().to_string()); let logfile = env::temp_dir().join(format!("nats-server-{id}.log")); let pidfile = env::temp_dir().join(format!("nats-server-{id}.pid")); let store_dir = env::temp_dir().join(format!("store-dir-{id}")); @@ -268,7 +268,7 @@ fn do_run(cfg: &str, port: Option<&str>, id: Option) -> Inner { Inner { port: port.map(ToString::to_string), cfg: cfg.to_string(), - id, + id: id.to_string(), child, logfile, pidfile, @@ -327,7 +327,7 @@ fn run_cluster_node_with_port( inner: Inner { port: port.map(ToString::to_string), cfg: cfg.to_string(), - id, + id: id.to_string(), child, logfile, pidfile, From af0779f06559c79f9c8e6573798eefd3ce4508f0 Mon Sep 17 00:00:00 2001 From: Quentin Date: Fri, 11 Aug 2023 17:31:42 +0200 Subject: [PATCH 24/43] Add watch_with_history for Object Store * feat(ObjectStore): Add watch_with_history method * feat(ObjectStore): Add test for watch_with_history --- async-nats/src/jetstream/object_store/mod.rs | 18 +++++- async-nats/tests/object_store.rs | 60 ++++++++++++++++++++ 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/async-nats/src/jetstream/object_store/mod.rs b/async-nats/src/jetstream/object_store/mod.rs index 20966b1a8..7db389390 100644 --- a/async-nats/src/jetstream/object_store/mod.rs +++ b/async-nats/src/jetstream/object_store/mod.rs @@ -30,7 +30,7 @@ use serde::{Deserialize, Serialize}; use tracing::{debug, trace}; use super::consumer::push::OrderedError; -use super::consumer::{StreamError, StreamErrorKind}; +use super::consumer::{DeliverPolicy, StreamError, StreamErrorKind}; use super::context::{PublishError, PublishErrorKind}; use super::stream::{ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind}; use super::{consumer::push::Ordered, stream::StorageType}; @@ -401,11 +401,25 @@ impl ObjectStore { /// # } /// ``` pub async fn watch(&self) -> Result, WatchError> { + self.watch_with_deliver_policy(DeliverPolicy::New).await + } + + /// Creates a [Watch] stream over changes in the [ObjectStore] which yields values whenever + /// there are changes for that key with as well as last value. + pub async fn watch_with_history(&self) -> Result, WatchError> { + self.watch_with_deliver_policy(DeliverPolicy::LastPerSubject) + .await + } + + async fn watch_with_deliver_policy( + &self, + deliver_policy: DeliverPolicy, + ) -> Result, WatchError> { let subject = format!("$O.{}.M.>", self.name); let ordered = self .stream .create_consumer(crate::jetstream::consumer::push::OrderedConfig { - deliver_policy: super::consumer::DeliverPolicy::New, + deliver_policy, deliver_subject: self.stream.context.client.new_inbox(), description: Some("object store watcher".to_string()), filter_subject: subject, diff --git a/async-nats/tests/object_store.rs b/async-nats/tests/object_store.rs index f4fcf74ba..b541b091c 100644 --- a/async-nats/tests/object_store.rs +++ b/async-nats/tests/object_store.rs @@ -110,6 +110,66 @@ mod object_store { assert!(object.deleted); } + #[tokio::test] + async fn watch_with_history() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + + let jetstream = async_nats::jetstream::new(client); + + let bucket = jetstream + .create_object_store(async_nats::jetstream::object_store::Config { + bucket: "bucket".to_string(), + ..Default::default() + }) + .await + .unwrap(); + + bucket + .put("FOO", &mut std::io::Cursor::new(vec![1, 2, 3, 4])) + .await + .unwrap(); + + bucket + .put("BAR", &mut std::io::Cursor::new(vec![5, 6, 7, 8])) + .await + .unwrap(); + + bucket + .put("FOO", &mut std::io::Cursor::new(vec![9, 0, 1, 2])) + .await + .unwrap(); + + let mut watcher = bucket.watch_with_history().await.unwrap(); + + tokio::task::spawn({ + let bucket = bucket.clone(); + async move { + tokio::time::sleep(Duration::from_millis(100)).await; + bucket + .put("BAR", &mut io::Cursor::new(vec![2, 3, 4, 5])) + .await + .unwrap(); + bucket.delete("BAR").await.unwrap(); + } + }); + + // check to see if we get the values in accordance to the LastPerSubject deliver policy + // we should get `BAR` and only one `FOO` + let object = watcher.next().await.unwrap().unwrap(); + assert_eq!(object.name, "BAR".to_string()); + + let object = watcher.next().await.unwrap().unwrap(); + assert_eq!(object.name, "FOO".to_string()); + + // make sure we get the rest correctly + let object = watcher.next().await.unwrap().unwrap(); + assert_eq!(object.name, "BAR".to_string()); + let object = watcher.next().await.unwrap().unwrap(); + assert_eq!(object.name, "BAR".to_string()); + assert!(object.deleted); + } + #[tokio::test] async fn info() { let server = nats_server::run_server("tests/configs/jetstream.conf"); From b82eb4876ea0dd8970f4b7def892aa21d86ebb19 Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Fri, 11 Aug 2023 23:03:32 +0800 Subject: [PATCH 25/43] Implement `AsRef<[u8]>` for `HeaderValue` --- async-nats/src/header.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/async-nats/src/header.rs b/async-nats/src/header.rs index b70af3637..c15534c36 100644 --- a/async-nats/src/header.rs +++ b/async-nats/src/header.rs @@ -231,6 +231,12 @@ impl fmt::Display for HeaderValue { } } +impl AsRef<[u8]> for HeaderValue { + fn as_ref(&self) -> &[u8] { + self.inner.as_ref() + } +} + impl From for String { fn from(header: HeaderValue) -> Self { header.to_string() From 480b24b91a17492c8e6b126d6defbbed4c2aab5c Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Sat, 12 Aug 2023 02:12:40 +0800 Subject: [PATCH 26/43] Implement `AsRef` for `HeaderValue` --- async-nats/src/header.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/async-nats/src/header.rs b/async-nats/src/header.rs index c15534c36..35953160c 100644 --- a/async-nats/src/header.rs +++ b/async-nats/src/header.rs @@ -237,6 +237,12 @@ impl AsRef<[u8]> for HeaderValue { } } +impl AsRef for HeaderValue { + fn as_ref(&self) -> &str { + self.as_str() + } +} + impl From for String { fn from(header: HeaderValue) -> Self { header.to_string() From bfb8ea8082c868dd4874d48427e3a9e99cf4c20a Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Sat, 12 Aug 2023 02:14:42 +0800 Subject: [PATCH 27/43] Remove `From` for `String` and `str` --- async-nats/src/header.rs | 24 +++--------------------- 1 file changed, 3 insertions(+), 21 deletions(-) diff --git a/async-nats/src/header.rs b/async-nats/src/header.rs index 35953160c..8f46a09ac 100644 --- a/async-nats/src/header.rs +++ b/async-nats/src/header.rs @@ -243,24 +243,6 @@ impl AsRef for HeaderValue { } } -impl From for String { - fn from(header: HeaderValue) -> Self { - header.to_string() - } -} - -impl From<&HeaderValue> for String { - fn from(header: &HeaderValue) -> Self { - header.to_string() - } -} - -impl<'a> From<&'a HeaderValue> for &'a str { - fn from(header: &'a HeaderValue) -> Self { - header.inner.as_str() - } -} - impl FromStr for HeaderValue { type Err = ParseHeaderValueError; @@ -297,7 +279,7 @@ impl HeaderValue { } pub fn as_str(&self) -> &str { - self.into() + self.inner.as_str() } } @@ -614,10 +596,10 @@ mod tests { assert_eq!(headers.get("Key").unwrap().to_string(), "value"); - let key: String = headers.get("Key").unwrap().into(); + let key: String = headers.get("Key").unwrap().as_str().into(); assert_eq!(key, "value".to_string()); - let key: String = headers.get("Key").unwrap().to_owned().into(); + let key: String = headers.get("Key").unwrap().as_str().to_owned(); assert_eq!(key, "value".to_string()); assert_eq!(headers.get("Key").unwrap().as_str(), "value"); From 69ee1bfe53e1d25e39300d8f512d36f36c69131c Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Fri, 11 Aug 2023 13:30:23 +0800 Subject: [PATCH 28/43] Implement `From` for `HeaderValue` on integer types --- async-nats/src/header.rs | 72 +++++++++++++++++++++++++++++++++++----- 1 file changed, 64 insertions(+), 8 deletions(-) diff --git a/async-nats/src/header.rs b/async-nats/src/header.rs index 8f46a09ac..fcc7d3ba7 100644 --- a/async-nats/src/header.rs +++ b/async-nats/src/header.rs @@ -243,17 +243,51 @@ impl AsRef for HeaderValue { } } -impl FromStr for HeaderValue { - type Err = ParseHeaderValueError; +impl From for HeaderValue { + fn from(v: i16) -> Self { + Self { + inner: v.to_string(), + } + } +} - fn from_str(s: &str) -> Result { - if s.contains(['\r', '\n']) { - return Err(ParseHeaderValueError); +impl From for HeaderValue { + fn from(v: i32) -> Self { + Self { + inner: v.to_string(), } + } +} - Ok(HeaderValue { - inner: s.to_string(), - }) +impl From for HeaderValue { + fn from(v: i64) -> Self { + Self { + inner: v.to_string(), + } + } +} + +impl From for HeaderValue { + fn from(v: isize) -> Self { + Self { + inner: v.to_string(), + } + } +} + +impl From for HeaderValue { + fn from(v: u16) -> Self { + Self { + inner: v.to_string(), + } + } +} + +impl From for HeaderValue { + fn from(v: u32) -> Self { + Self { + inner: v.to_string(), + } } } @@ -265,6 +299,28 @@ impl From for HeaderValue { } } +impl From for HeaderValue { + fn from(v: usize) -> Self { + Self { + inner: v.to_string(), + } + } +} + +impl FromStr for HeaderValue { + type Err = ParseHeaderValueError; + + fn from_str(s: &str) -> Result { + if s.contains(['\r', '\n']) { + return Err(ParseHeaderValueError); + } + + Ok(HeaderValue { + inner: s.to_string(), + }) + } +} + impl From<&str> for HeaderValue { fn from(v: &str) -> Self { Self { From 474cb14e2606cf6cb2603057d98829a72b087dc5 Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Tue, 15 Aug 2023 02:35:31 +0800 Subject: [PATCH 29/43] Add requests multiplexing --- async-nats/src/client.rs | 111 +++++++++++++++++++++++++++------------ async-nats/src/lib.rs | 103 ++++++++++++++++++++++++++++++++++++ 2 files changed, 179 insertions(+), 35 deletions(-) diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index 882f54e40..9791c246f 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -18,7 +18,7 @@ use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber} use crate::error::Error; use bytes::Bytes; use futures::future::TryFutureExt; -use futures::stream::StreamExt; +use futures::StreamExt; use once_cell::sync::Lazy; use regex::Regex; use std::fmt::Display; @@ -26,7 +26,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; use thiserror::Error; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tracing::trace; static VERSION_RE: Lazy = @@ -71,7 +71,7 @@ impl Client { info, state, sender, - next_subscription_id: Arc::new(AtomicU64::new(0)), + next_subscription_id: Arc::new(AtomicU64::new(1)), subscription_capacity: capacity, inbox_prefix, request_timeout, @@ -335,42 +335,83 @@ impl Client { subject: String, request: Request, ) -> Result { - let inbox = request.inbox.unwrap_or_else(|| self.new_inbox()); - let timeout = request.timeout.unwrap_or(self.request_timeout); - let mut sub = self.subscribe(inbox.clone()).await?; - let payload: Bytes = request.payload.unwrap_or_else(Bytes::new); - match request.headers { - Some(headers) => { - self.publish_with_reply_and_headers(subject, inbox, headers, payload) - .await? + if let Some(inbox) = request.inbox { + let timeout = request.timeout.unwrap_or(self.request_timeout); + let mut sub = self.subscribe(inbox.clone()).await?; + let payload: Bytes = request.payload.unwrap_or_else(Bytes::new); + match request.headers { + Some(headers) => { + self.publish_with_reply_and_headers(subject, inbox, headers, payload) + .await? + } + None => self.publish_with_reply(subject, inbox, payload).await?, } - None => self.publish_with_reply(subject, inbox, payload).await?, - } - self.flush() - .await - .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?; - let request = match timeout { - Some(timeout) => { - tokio::time::timeout(timeout, sub.next()) - .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err)) - .await? + self.flush() + .await + .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?; + let request = match timeout { + Some(timeout) => { + tokio::time::timeout(timeout, sub.next()) + .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err)) + .await? + } + None => sub.next().await, + }; + match request { + Some(message) => { + if message.status == Some(StatusCode::NO_RESPONDERS) { + return Err(RequestError::with_source( + RequestErrorKind::NoResponders, + "no responders", + )); + } + Ok(message) + } + None => Err(RequestError::with_source( + RequestErrorKind::Other, + "broken pipe", + )), } - None => sub.next().await, - }; - match request { - Some(message) => { - if message.status == Some(StatusCode::NO_RESPONDERS) { - return Err(RequestError::with_source( - RequestErrorKind::NoResponders, - "no responders", - )); + } else { + let (sender, receiver) = oneshot::channel(); + + let payload = request.payload.unwrap_or_else(Bytes::new); + let respond = self.new_inbox(); + let headers = request.headers; + + self.sender + .send(Command::Request { + subject, + payload, + respond, + headers, + sender, + }) + .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err)) + .await?; + + let timeout = request.timeout.unwrap_or(self.request_timeout); + let request = match timeout { + Some(timeout) => { + tokio::time::timeout(timeout, receiver) + .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err)) + .await? + } + None => receiver.await, + }; + + match request { + Ok(message) => { + if message.status == Some(StatusCode::NO_RESPONDERS) { + return Err(RequestError::with_source( + RequestErrorKind::NoResponders, + "no responders", + )); + } + Ok(message) } - Ok(message) + Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)), } - None => Err(RequestError::with_source( - RequestErrorKind::Other, - "broken pipe", - )), } } diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 200f88f7e..4e3d14e36 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -152,6 +152,7 @@ pub type Error = Box; const VERSION: &str = env!("CARGO_PKG_VERSION"); const LANG: &str = "rust"; const MAX_PENDING_PINGS: usize = 2; +const MULTIPLEXER_SID: u64 = 0; /// A re-export of the `rustls` crate used in this crate, /// for use in cases where manual client configurations @@ -267,6 +268,13 @@ pub(crate) enum Command { respond: Option, headers: Option, }, + Request { + subject: String, + payload: Bytes, + respond: String, + headers: Option, + sender: oneshot::Sender, + }, Subscribe { sid: u64, subject: String, @@ -315,11 +323,19 @@ struct Subscription { max: Option, } +#[derive(Debug)] +struct Multiplexer { + subject: String, + prefix: String, + senders: HashMap>, +} + /// A connection handler which facilitates communication from channels to a single shared connection. pub(crate) struct ConnectionHandler { connection: Connection, connector: Connector, subscriptions: HashMap, + multiplexer: Option, pending_pings: usize, info_sender: tokio::sync::watch::Sender, ping_interval: Interval, @@ -344,6 +360,7 @@ impl ConnectionHandler { connection, connector, subscriptions: HashMap::new(), + multiplexer: None, pending_pings: 0, info_sender, ping_interval, @@ -484,6 +501,28 @@ impl ConnectionHandler { self.handle_flush().await?; } } + } else if sid == MULTIPLEXER_SID { + if let Some(multiplexer) = self.multiplexer.as_mut() { + let maybe_token = subject.strip_prefix(&multiplexer.prefix).to_owned(); + + if let Some(token) = maybe_token { + if let Some(sender) = multiplexer.senders.remove(token) { + let message = Message { + subject, + reply, + payload, + headers, + status, + description, + length, + }; + + sender.send(message).map_err(|_| { + io::Error::new(io::ErrorKind::Other, "request receiver closed") + })?; + } + } + } } } // TODO: we should probably update advertised server list here too. @@ -591,6 +630,58 @@ impl ConnectionHandler { error!("Sending Subscribe failed with {:?}", err); } } + Command::Request { + subject, + payload, + respond, + headers, + sender, + } => { + let (prefix, token) = respond.rsplit_once('.').ok_or_else(|| { + io::Error::new(io::ErrorKind::Other, "malformed request subject") + })?; + + let multiplexer = if let Some(multiplexer) = self.multiplexer.as_mut() { + multiplexer + } else { + let subject = format!("{}.*", prefix); + + if let Err(err) = self + .connection + .write_op(&ClientOp::Subscribe { + sid: MULTIPLEXER_SID, + subject: subject.clone(), + queue_group: None, + }) + .await + { + error!("Sending Subscribe failed with {:?}", err); + } + + self.multiplexer.insert(Multiplexer { + subject, + prefix: format!("{}.", prefix), + senders: HashMap::new(), + }) + }; + + multiplexer.senders.insert(token.to_owned(), sender); + + let pub_op = ClientOp::Publish { + subject, + payload, + respond: Some(respond), + headers, + }; + + while let Err(err) = self.connection.write_op(&pub_op).await { + self.handle_disconnect().await?; + error!("Sending Publish failed with {:?}", err); + } + + self.connection.flush().await?; + } + Command::Publish { subject, payload, @@ -645,6 +736,18 @@ impl ConnectionHandler { .await .unwrap(); } + + if let Some(multiplexer) = &self.multiplexer { + self.connection + .write_op(&ClientOp::Subscribe { + sid: MULTIPLEXER_SID, + subject: multiplexer.subject.to_owned(), + queue_group: None, + }) + .await + .unwrap(); + } + self.connector.events_tx.try_send(Event::Connected).ok(); Ok(()) From 670f68adfd1605823f77631ce01e97c77207012b Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Wed, 16 Aug 2023 12:29:40 +0200 Subject: [PATCH 30/43] Add object store object link Signed-off-by: Tomasz Pietrek --- async-nats/src/jetstream/object_store/mod.rs | 165 ++++++++++++++++++- async-nats/tests/object_store.rs | 59 ++++++- 2 files changed, 221 insertions(+), 3 deletions(-) diff --git a/async-nats/src/jetstream/object_store/mod.rs b/async-nats/src/jetstream/object_store/mod.rs index 7db389390..f77dd378f 100644 --- a/async-nats/src/jetstream/object_store/mod.rs +++ b/async-nats/src/jetstream/object_store/mod.rs @@ -619,6 +619,100 @@ impl ObjectStore { Ok(info) } + + /// Adds a link to an [Object]. + /// It creates a new [Object] in the [ObjectStore] that points to another [Object] + /// and does not have any contents on it's own. + pub async fn add_link( + &self, + name: T, + object: &ObjectInfo, + ) -> Result { + let name = name.to_string(); + if name.is_empty() { + return Err(AddLinkError::new(AddLinkErrorKind::EmptyName)); + } + if object.name.is_empty() { + return Err(AddLinkError::new(AddLinkErrorKind::ObjectRequired)); + } + if object.deleted { + return Err(AddLinkError::new(AddLinkErrorKind::Deleted)); + } + if object.link.is_some() { + return Err(AddLinkError::new(AddLinkErrorKind::LinkToLink)); + } + + match self.info(&name).await { + Ok(info) => { + if info.link.is_none() { + return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists)); + } + } + Err(err) if err.kind() != InfoErrorKind::NotFound => { + return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err)) + } + _ => (), + } + + let info = ObjectInfo { + name: name.clone(), + description: None, + link: Some(ObjectLink { + name: Some(object.name.clone()), + bucket: object.bucket.to_string(), + }), + bucket: self.name.clone(), + nuid: nuid::next().to_string(), + size: 0, + chunks: 0, + modified: OffsetDateTime::now_utc(), + digest: None, + deleted: false, + }; + publish_meta(&self, &info).await?; + Ok(info) + } + +async fn publish_meta(store: &ObjectStore, info: &ObjectInfo) -> Result<(), PublishMetadataError> { + let encoded_object_name = encode_object_name(&info.name); + let subject = format!("$O.{}.M.{}", &store.name, &encoded_object_name); + + let mut headers = HeaderMap::new(); + headers.insert( + NATS_ROLLUP, + ROLLUP_SUBJECT.parse::().map_err(|err| { + PublishMetadataError::with_source( + PublishMetadataErrorKind::Other, + format!("failed parsing header: {}", err), + ) + })?, + ); + let data = serde_json::to_vec(&info).map_err(|err| { + PublishMetadataError::with_source( + PublishMetadataErrorKind::Other, + format!("failed serializing object info: {}", err), + ) + })?; + + store + .stream + .context + .publish_with_headers(subject, headers, data.into()) + .await + .map_err(|err| { + PublishMetadataError::with_source( + PublishMetadataErrorKind::PublishMetadata, + format!("failed publishing metadata: {}", err), + ) + })? + .await + .map_err(|err| { + PublishMetadataError::with_source( + PublishMetadataErrorKind::PublishMetadata, + format!("failed ack from metadata publish: {}", err), + ) + })?; + Ok(()) } pub struct Watch<'a> { @@ -850,9 +944,9 @@ fn is_default(t: &T) -> bool { #[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct ObjectLink { /// Name of the object - pub name: String, + pub name: Option, /// Name of the bucket the object is stored in. - pub bucket: Option, + pub bucket: String, } /// Meta information about an object. @@ -1052,6 +1146,73 @@ impl Display for PutErrorKind { pub type PutError = Error; +pub type AddLinkError = Error; + +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum AddLinkErrorKind { + EmptyName, + ObjectRequired, + Deleted, + LinkToLink, + PublishMetadata, + AlreadyExists, + Other, +} + +impl Display for AddLinkErrorKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + AddLinkErrorKind::ObjectRequired => write!(f, "cannot link to empty Object"), + AddLinkErrorKind::Deleted => write!(f, "cannot link a deleted Object"), + AddLinkErrorKind::LinkToLink => write!(f, "cannot link to another link"), + AddLinkErrorKind::EmptyName => write!(f, "link name cannot be empty"), + AddLinkErrorKind::PublishMetadata => write!(f, "failed publishing link metadata"), + AddLinkErrorKind::Other => write!(f, "error"), + AddLinkErrorKind::AlreadyExists => write!(f, "object already exists"), + } + } +} + +type PublishMetadataError = Error; + +#[derive(Clone, Copy, Debug, PartialEq)] +enum PublishMetadataErrorKind { + PublishMetadata, + Other, +} + +impl Display for PublishMetadataErrorKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PublishMetadataErrorKind::PublishMetadata => write!(f, "failed to publish metadata"), + PublishMetadataErrorKind::Other => write!(f, "error"), + } + } +} + +impl From for AddLinkError { + fn from(error: PublishMetadataError) -> Self { + match error.kind { + PublishMetadataErrorKind::PublishMetadata => { + AddLinkError::new(AddLinkErrorKind::PublishMetadata) + } + PublishMetadataErrorKind::Other => { + AddLinkError::with_source(AddLinkErrorKind::Other, error) + } + } + } +} +impl From for PutError { + fn from(error: PublishMetadataError) -> Self { + match error.kind { + PublishMetadataErrorKind::PublishMetadata => { + PutError::new(PutErrorKind::PublishMetadata) + } + PublishMetadataErrorKind::Other => PutError::with_source(PutErrorKind::Other, error), + } + } +} + #[derive(Clone, Copy, Debug, PartialEq)] pub enum WatchErrorKind { TimedOut, diff --git a/async-nats/tests/object_store.rs b/async-nats/tests/object_store.rs index b541b091c..1d6ed6c2e 100644 --- a/async-nats/tests/object_store.rs +++ b/async-nats/tests/object_store.rs @@ -15,7 +15,10 @@ mod object_store { use std::{io, time::Duration}; - use async_nats::jetstream::{object_store::ObjectMeta, stream::DirectGetErrorKind}; + use async_nats::jetstream::{ + object_store::{AddLinkErrorKind, ObjectMeta}, + stream::DirectGetErrorKind, + }; use base64::Engine; use futures::StreamExt; use rand::RngCore; @@ -457,4 +460,58 @@ mod object_store { assert_eq!(info.name, given_metadata.name); assert_eq!(info.description, given_metadata.description); } + + #[tokio::test] + async fn add_link() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + + let jetstream = async_nats::jetstream::new(client); + + let bucket = jetstream + .create_object_store(async_nats::jetstream::object_store::Config { + bucket: "bucket".to_string(), + ..Default::default() + }) + .await + .unwrap(); + let object = bucket + .put("object", &mut "some data".as_bytes()) + .await + .unwrap(); + + let another_object = bucket + .put("another_object", &mut "other data".as_bytes()) + .await + .unwrap(); + + bucket.add_link("link", &object).await.unwrap(); + + let link_info = bucket.info("link").await.unwrap(); + + assert_eq!( + link_info + .link + .as_ref() + .unwrap() + .name + .as_ref() + .unwrap() + .as_str(), + "object" + ); + assert_eq!(link_info.link.as_ref().unwrap().bucket.as_str(), "bucket"); + + let result = bucket + .add_link("object", &another_object) + .await + .unwrap_err(); + assert_eq!(result.kind(), AddLinkErrorKind::AlreadyExists); + + let result = bucket.add_link("", &another_object).await.unwrap_err(); + assert_eq!(result.kind(), AddLinkErrorKind::EmptyName); + + let result = bucket.add_link("new_link", &link_info).await.unwrap_err(); + assert_eq!(result.kind(), AddLinkErrorKind::LinkToLink); + } } From 80926c23198e68290115d1e4179b69a19fbe135c Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Wed, 16 Aug 2023 12:30:05 +0200 Subject: [PATCH 31/43] Add object store bucket link Signed-off-by: Tomasz Pietrek --- async-nats/src/jetstream/object_store/mod.rs | 46 ++++++++++++++++++++ async-nats/tests/object_store.rs | 29 ++++++++++++ 2 files changed, 75 insertions(+) diff --git a/async-nats/src/jetstream/object_store/mod.rs b/async-nats/src/jetstream/object_store/mod.rs index f77dd378f..b0ed322e0 100644 --- a/async-nats/src/jetstream/object_store/mod.rs +++ b/async-nats/src/jetstream/object_store/mod.rs @@ -673,6 +673,52 @@ impl ObjectStore { Ok(info) } + /// Adds a link to another [ObjectStore] bucket by creating a new [Object] + /// in the current [ObjectStore] that points to another [ObjectStore] and + /// does not contain any data. + pub async fn add_bucket_link( + &self, + name: T, + bucket: T, + ) -> Result { + let name = name.to_string(); + let bucket = bucket.to_string(); + if name.is_empty() { + return Err(AddLinkError::new(AddLinkErrorKind::EmptyName)); + } + + match self.info(&name).await { + Ok(info) => { + if info.link.is_none() { + return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists)); + } + } + Err(err) if err.kind() != InfoErrorKind::NotFound => { + return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err)) + } + _ => (), + } + + let info = ObjectInfo { + name: name.clone(), + description: None, + link: Some(ObjectLink { + name: None, + bucket: bucket.to_string(), + }), + bucket: self.name.clone(), + nuid: nuid::next().to_string(), + size: 0, + chunks: 0, + modified: OffsetDateTime::now_utc(), + digest: None, + deleted: false, + }; + publish_meta(&self, &info).await?; + Ok(info) + } +} + async fn publish_meta(store: &ObjectStore, info: &ObjectInfo) -> Result<(), PublishMetadataError> { let encoded_object_name = encode_object_name(&info.name); let subject = format!("$O.{}.M.{}", &store.name, &encoded_object_name); diff --git a/async-nats/tests/object_store.rs b/async-nats/tests/object_store.rs index 1d6ed6c2e..e84c50fb7 100644 --- a/async-nats/tests/object_store.rs +++ b/async-nats/tests/object_store.rs @@ -514,4 +514,33 @@ mod object_store { let result = bucket.add_link("new_link", &link_info).await.unwrap_err(); assert_eq!(result.kind(), AddLinkErrorKind::LinkToLink); } + + #[tokio::test] + async fn add_bucket_link() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + + let jetstream = async_nats::jetstream::new(client); + + jetstream + .create_object_store(async_nats::jetstream::object_store::Config { + bucket: "another".to_string(), + ..Default::default() + }) + .await + .unwrap(); + let bucket = jetstream + .create_object_store(async_nats::jetstream::object_store::Config { + bucket: "bucket".to_string(), + ..Default::default() + }) + .await + .unwrap(); + + bucket.add_bucket_link("link", "another").await.unwrap(); + + let link_info = bucket.info("link").await.unwrap(); + assert!(link_info.link.as_ref().unwrap().name.is_none()); + assert_eq!(link_info.link.as_ref().unwrap().bucket.as_str(), "another"); + } } From f11396366648bb92d9709f2e073208bcdd4f338b Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Wed, 16 Aug 2023 15:44:41 +0200 Subject: [PATCH 32/43] Move Object subscription creation to Object read Until now, the subscription for reading Object data was spawn when calling `bucket.get()`. While that makes sense in many cases when implementing readers, it can lead to unwanted behaviours in NATS. The main issue being - there is no guarantee that user will start reading messages as soon as `bucket.get()` is called. That can lead to unnecesary idling subscription. Signed-off-by: Tomasz Pietrek --- async-nats/src/jetstream/object_store/mod.rs | 60 +++++++++++++------- 1 file changed, 40 insertions(+), 20 deletions(-) diff --git a/async-nats/src/jetstream/object_store/mod.rs b/async-nats/src/jetstream/object_store/mod.rs index b0ed322e0..9e0387b32 100644 --- a/async-nats/src/jetstream/object_store/mod.rs +++ b/async-nats/src/jetstream/object_store/mod.rs @@ -20,6 +20,7 @@ use crate::{HeaderMap, HeaderValue}; use base64::engine::general_purpose::{STANDARD, URL_SAFE}; use base64::engine::Engine; use bytes::BytesMut; +use futures::future::BoxFuture; use once_cell::sync::Lazy; use ring::digest::SHA256; use tokio::io::AsyncReadExt; @@ -29,10 +30,10 @@ use regex::Regex; use serde::{Deserialize, Serialize}; use tracing::{debug, trace}; -use super::consumer::push::OrderedError; +use super::consumer::push::{OrderedConfig, OrderedError}; use super::consumer::{DeliverPolicy, StreamError, StreamErrorKind}; use super::context::{PublishError, PublishErrorKind}; -use super::stream::{ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind}; +use super::stream::{self, ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind}; use super::{consumer::push::Ordered, stream::StorageType}; use crate::error::Error; use time::{serde::rfc3339, OffsetDateTime}; @@ -113,20 +114,7 @@ impl ObjectStore { // return self.get(link.name).await; // } - let chunk_subject = format!("$O.{}.C.{}", self.name, object_info.nuid); - - let subscription = self - .stream - .create_consumer(crate::jetstream::consumer::push::OrderedConfig { - filter_subject: chunk_subject, - deliver_subject: self.stream.context.client.new_inbox(), - ..Default::default() - }) - .await? - .messages() - .await?; - - Ok(Object::new(subscription, object_info)) + Ok(Object::new(object_info, self.stream.clone())) } /// Gets an [Object] from the [ObjectStore]. @@ -217,7 +205,7 @@ impl ObjectStore { .get_last_raw_message_by_subject(subject.as_str()) .await .map_err(|err| match err.kind() { - super::stream::LastRawMessageErrorKind::NoMessageFound => { + stream::LastRawMessageErrorKind::NoMessageFound => { InfoError::new(InfoErrorKind::NotFound) } _ => InfoError::with_source(InfoErrorKind::Other, err), @@ -852,16 +840,20 @@ pub struct Object<'a> { has_pending_messages: bool, digest: Option, subscription: Option>, + subscription_future: Option, StreamError>>>, + stream: crate::jetstream::stream::Stream, } impl<'a> Object<'a> { - pub(crate) fn new(subscription: Ordered<'a>, info: ObjectInfo) -> Self { + pub(crate) fn new(info: ObjectInfo, stream: stream::Stream) -> Self { Object { - subscription: Some(subscription), + subscription: None, info, remaining_bytes: VecDeque::new(), has_pending_messages: true, digest: Some(ring::digest::Context::new(&SHA256)), + subscription_future: None, + stream, } } @@ -886,6 +878,34 @@ impl tokio::io::AsyncRead for Object<'_> { } if self.has_pending_messages { + if self.subscription.is_none() { + let future = match self.subscription_future.as_mut() { + Some(future) => future, + None => { + let stream = self.stream.clone(); + let bucket = self.info.bucket.clone(); + let nuid = self.info.nuid.clone(); + self.subscription_future.insert(Box::pin(async move { + stream + .create_consumer(OrderedConfig { + deliver_subject: stream.context.client.new_inbox(), + filter_subject: format!("$O.{}.C.{}", bucket, nuid), + ..Default::default() + }) + .await + .unwrap() + .messages() + .await + })) + } + }; + match future.as_mut().poll(cx) { + Poll::Ready(subscription) => { + self.subscription = Some(subscription.unwrap()); + } + Poll::Pending => (), + } + } if let Some(subscription) = self.subscription.as_mut() { match subscription.poll_next_unpin(cx) { Poll::Ready(message) => match message { @@ -946,7 +966,7 @@ impl tokio::io::AsyncRead for Object<'_> { Poll::Pending => Poll::Pending, } } else { - Poll::Ready(Ok(())) + Poll::Pending } } else { Poll::Ready(Ok(())) From 7de5eca6d7a7feaaa6e531de64930499104d4722 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Wed, 16 Aug 2023 16:22:03 +0200 Subject: [PATCH 33/43] Allow object store object get following links Signed-off-by: Tomasz Pietrek --- async-nats/Cargo.toml | 1 + async-nats/src/jetstream/object_store/mod.rs | 56 ++++++++++++++------ async-nats/tests/object_store.rs | 11 ++++ 3 files changed, 53 insertions(+), 15 deletions(-) diff --git a/async-nats/Cargo.toml b/async-nats/Cargo.toml index 785f43e94..8f6a8c6fb 100644 --- a/async-nats/Cargo.toml +++ b/async-nats/Cargo.toml @@ -40,6 +40,7 @@ tokio-retry = "0.3" ring = "0.16" rand = "0.8" webpki = { package = "rustls-webpki", version = "0.101.2", features = ["alloc", "std"] } +async-recursion = "1.0.4" [dev-dependencies] criterion = { version = "0.5", features = ["async_tokio"]} diff --git a/async-nats/src/jetstream/object_store/mod.rs b/async-nats/src/jetstream/object_store/mod.rs index 9e0387b32..a029227b4 100644 --- a/async-nats/src/jetstream/object_store/mod.rs +++ b/async-nats/src/jetstream/object_store/mod.rs @@ -36,6 +36,7 @@ use super::context::{PublishError, PublishErrorKind}; use super::stream::{self, ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind}; use super::{consumer::push::Ordered, stream::StorageType}; use crate::error::Error; +use async_recursion::async_recursion; use time::{serde::rfc3339, OffsetDateTime}; const DEFAULT_CHUNK_SIZE: usize = 128 * 1024; @@ -108,13 +109,39 @@ impl ObjectStore { /// # Ok(()) /// # } /// ``` - pub async fn get>(&self, object_name: T) -> Result, GetError> { - let object_info = self.info(object_name).await?; - // if let Some(link) = object_info.link { - // return self.get(link.name).await; - // } - - Ok(Object::new(object_info, self.stream.clone())) + pub fn get<'bucket, 'object, 'future, T>( + &'bucket self, + object_name: T, + ) -> BoxFuture<'future, Result, GetError>> + where + T: AsRef + Send + 'future, + 'bucket: 'future, + { + Box::pin(async move { + let object_info = self.info(object_name).await?; + if let Some(link) = object_info.link.as_ref() { + if let Some(link_name) = link.name.as_ref() { + let link_name = link_name.clone(); + debug!("getting object via link"); + if link.bucket == self.name { + return self.get(link_name).await; + } else { + let bucket = self + .stream + .context + .get_object_store(&link_name) + .await + .map_err(|err| GetError::with_source(GetErrorKind::Other, err))?; + let object = bucket.get(&link_name).await?; + return Ok(object); + } + } else { + return Err(GetError::new(GetErrorKind::BucketLink)); + } + } + debug!("not a link. Getting the object"); + Ok(Object::new(object_info, self.stream.clone())) + }) } /// Gets an [Object] from the [ObjectStore]. @@ -643,11 +670,11 @@ impl ObjectStore { } let info = ObjectInfo { - name: name.clone(), + name, description: None, link: Some(ObjectLink { name: Some(object.name.clone()), - bucket: object.bucket.to_string(), + bucket: object.bucket.clone(), }), bucket: self.name.clone(), nuid: nuid::next().to_string(), @@ -657,7 +684,7 @@ impl ObjectStore { digest: None, deleted: false, }; - publish_meta(&self, &info).await?; + publish_meta(self, &info).await?; Ok(info) } @@ -690,10 +717,7 @@ impl ObjectStore { let info = ObjectInfo { name: name.clone(), description: None, - link: Some(ObjectLink { - name: None, - bucket: bucket.to_string(), - }), + link: Some(ObjectLink { name: None, bucket }), bucket: self.name.clone(), nuid: nuid::next().to_string(), size: 0, @@ -702,7 +726,7 @@ impl ObjectStore { digest: None, deleted: false, }; - publish_meta(&self, &info).await?; + publish_meta(self, &info).await?; Ok(info) } } @@ -1114,6 +1138,7 @@ pub enum GetErrorKind { InvalidName, ConsumerCreate, NotFound, + BucketLink, Other, TimedOut, } @@ -1126,6 +1151,7 @@ impl Display for GetErrorKind { Self::NotFound => write!(f, "object not found"), Self::TimedOut => write!(f, "timed out"), Self::InvalidName => write!(f, "invalid object name"), + Self::BucketLink => write!(f, "object is a link to a bucket"), } } } diff --git a/async-nats/tests/object_store.rs b/async-nats/tests/object_store.rs index e84c50fb7..ad76211e5 100644 --- a/async-nats/tests/object_store.rs +++ b/async-nats/tests/object_store.rs @@ -70,6 +70,17 @@ mod object_store { object.info.digest ); assert_eq!(result, bytes); + + // Check if following a link works. + bucket.add_link("link", &object.info).await.unwrap(); + + tracing::info!("getting link"); + let mut object_link = bucket.get("link").await.unwrap(); + let mut contents = Vec::new(); + + tracing::info!("reading content"); + object_link.read_to_end(&mut contents).await.unwrap(); + assert_eq!(contents, result); } #[tokio::test] From fe5b542781a4e1709d3825badf2965d2ba500e69 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Wed, 16 Aug 2023 21:20:35 +0200 Subject: [PATCH 34/43] Add examples Signed-off-by: Tomasz Pietrek --- async-nats/Cargo.toml | 1 - async-nats/src/jetstream/object_store/mod.rs | 31 +++++++++++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/async-nats/Cargo.toml b/async-nats/Cargo.toml index 8f6a8c6fb..785f43e94 100644 --- a/async-nats/Cargo.toml +++ b/async-nats/Cargo.toml @@ -40,7 +40,6 @@ tokio-retry = "0.3" ring = "0.16" rand = "0.8" webpki = { package = "rustls-webpki", version = "0.101.2", features = ["alloc", "std"] } -async-recursion = "1.0.4" [dev-dependencies] criterion = { version = "0.5", features = ["async_tokio"]} diff --git a/async-nats/src/jetstream/object_store/mod.rs b/async-nats/src/jetstream/object_store/mod.rs index a029227b4..961611f88 100644 --- a/async-nats/src/jetstream/object_store/mod.rs +++ b/async-nats/src/jetstream/object_store/mod.rs @@ -36,7 +36,6 @@ use super::context::{PublishError, PublishErrorKind}; use super::stream::{self, ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind}; use super::{consumer::push::Ordered, stream::StorageType}; use crate::error::Error; -use async_recursion::async_recursion; use time::{serde::rfc3339, OffsetDateTime}; const DEFAULT_CHUNK_SIZE: usize = 128 * 1024; @@ -638,6 +637,22 @@ impl ObjectStore { /// Adds a link to an [Object]. /// It creates a new [Object] in the [ObjectStore] that points to another [Object] /// and does not have any contents on it's own. + /// Links are automatically followed (one level deep) when calling [ObjectStore::get]. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use async_nats::jetstream::object_store; + /// let client = async_nats::connect("demo.nats.io").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// let bucket = jetstream.get_object_store("bucket").await?; + /// let object = bucket.get("object").await?; + /// bucket.add_link("link_to_object", &object.info).await?; + /// # Ok(()) + /// # } + /// ``` pub async fn add_link( &self, name: T, @@ -691,6 +706,20 @@ impl ObjectStore { /// Adds a link to another [ObjectStore] bucket by creating a new [Object] /// in the current [ObjectStore] that points to another [ObjectStore] and /// does not contain any data. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use async_nats::jetstream::object_store; + /// let client = async_nats::connect("demo.nats.io").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// let bucket = jetstream.get_object_store("bucket").await?; + /// bucket.add_bucket_link("link_to_object", "another_bucket").await?; + /// # Ok(()) + /// # } + /// ``` pub async fn add_bucket_link( &self, name: T, From 8365b9174a080aab218d9aa3d181ca6531bd1e4c Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Wed, 16 Aug 2023 22:11:54 +0200 Subject: [PATCH 35/43] Improve ergonomics of links Before this change, users were forced to pass `info` field from `Object`. This is not very ergonomic, as what library needs to add link - a whole [Object], or just [ObjectInfo] should be internal. This change adds a trait that handles that. Signed-off-by: Tomasz Pietrek --- async-nats/src/jetstream/object_store/mod.rs | 38 ++++++++++++++------ 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/async-nats/src/jetstream/object_store/mod.rs b/async-nats/src/jetstream/object_store/mod.rs index 961611f88..1f160e2dd 100644 --- a/async-nats/src/jetstream/object_store/mod.rs +++ b/async-nats/src/jetstream/object_store/mod.rs @@ -649,15 +649,16 @@ impl ObjectStore { /// let jetstream = async_nats::jetstream::new(client); /// let bucket = jetstream.get_object_store("bucket").await?; /// let object = bucket.get("object").await?; - /// bucket.add_link("link_to_object", &object.info).await?; + /// bucket.add_link("link_to_object", &object).await?; /// # Ok(()) /// # } /// ``` - pub async fn add_link( - &self, - name: T, - object: &ObjectInfo, - ) -> Result { + pub async fn add_link<'a, T, O>(&self, name: T, object: O) -> Result + where + T: ToString, + O: AsObjectInfo, + { + let object = object.as_info(); let name = name.to_string(); if name.is_empty() { return Err(AddLinkError::new(AddLinkErrorKind::EmptyName)); @@ -714,16 +715,18 @@ impl ObjectStore { /// # async fn main() -> Result<(), async_nats::Error> { /// use async_nats::jetstream::object_store; /// let client = async_nats::connect("demo.nats.io").await?; - /// let jetstream = async_nats::jetstream::new(client); + /// let jetstream = async_nats::jetstream::new(client); /// let bucket = jetstream.get_object_store("bucket").await?; - /// bucket.add_bucket_link("link_to_object", "another_bucket").await?; + /// bucket + /// .add_bucket_link("link_to_object", "another_bucket") + /// .await?; /// # Ok(()) /// # } /// ``` - pub async fn add_bucket_link( + pub async fn add_bucket_link( &self, name: T, - bucket: T, + bucket: U, ) -> Result { let name = name.to_string(); let bucket = bucket.to_string(); @@ -1086,6 +1089,21 @@ impl From<&str> for ObjectMeta { } } +pub trait AsObjectInfo { + fn as_info(&self) -> &ObjectInfo; +} + +impl AsObjectInfo for &Object<'_> { + fn as_info(&self) -> &ObjectInfo { + &self.info + } +} +impl AsObjectInfo for &ObjectInfo { + fn as_info(&self) -> &ObjectInfo { + self + } +} + impl From for ObjectMeta { fn from(info: ObjectInfo) -> Self { ObjectMeta { From 2cecc67a2970284e905287f071d50a9b29cb3cb7 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 21 Aug 2023 10:12:33 +0200 Subject: [PATCH 36/43] Bump serde to 1.0.184 This bump is done to avoid relying on serde 1.0.172 release, which had precompiled binary included, that has been reverted in 1.0.184 1.0.184 release: https://github.com/serde-rs/serde/releases/tag/v1.0.184 context: https://github.com/serde-rs/serde/issues/2538 Signed-off-by: Tomasz Pietrek --- async-nats/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async-nats/Cargo.toml b/async-nats/Cargo.toml index 785f43e94..32679f76b 100644 --- a/async-nats/Cargo.toml +++ b/async-nats/Cargo.toml @@ -20,7 +20,7 @@ futures = { version = "0.3.28", default-features = false, features = ["std", "as nkeys = "0.3.0" once_cell = "1.18.0" regex = "1.9.1" -serde = { version = "1.0.179", features = ["derive"] } +serde = { version = "1.0.184", features = ["derive"] } serde_json = "1.0.104" serde_repr = "0.1.16" http = "0.2.9" From 9d9c8a795ed439f87d41917365f1eaca6dd47072 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 21 Aug 2023 11:25:11 +0200 Subject: [PATCH 37/43] Fix build badge Signed-off-by: Tomasz Pietrek --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 7ae9766de..d9a803996 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ There are two clients available in two separate crates: [![License Apache 2](https://img.shields.io/badge/License-Apache2-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0) [![Crates.io](https://img.shields.io/crates/v/async-nats.svg)](https://crates.io/crates/async-nats) [![Documentation](https://docs.rs/async-nats/badge.svg)](https://docs.rs/async-nats/) -[![Build Status](https://github.com/nats-io/nats.rs/workflows/Rust/badge.svg)](https://github.com/nats-io/nats.rs/actions) +[![Build Status](https://github.com/nats-io/nats.rs/actions/workflows/test.yml/badge.svg?branch=main)](https://github.com/nats-io/nats.rs/actions) New async Tokio-based NATS client. @@ -48,7 +48,7 @@ Any feedback related to this client is welcomed. [![License Apache 2](https://img.shields.io/badge/License-Apache2-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0) [![Crates.io](https://img.shields.io/crates/v/nats.svg)](https://crates.io/crates/nats) [![Documentation](https://docs.rs/nats/badge.svg)](https://docs.rs/nats/) -[![Build Status](https://github.com/nats-io/nats.rs/workflows/Rust/badge.svg)](https://github.com/nats-io/nats.rs/actions) +[![Build Status](https://github.com/nats-io/nats.rs/actions/workflows/test.yml/badge.svg?branch=main)](https://github.com/nats-io/nats.rs/actions) Legacy *synchronous* client that supports: From 382b94aa893aa05a5c5ed41cda1cf2fcd0ded141 Mon Sep 17 00:00:00 2001 From: Sohum Banerjea Date: Wed, 23 Aug 2023 16:52:14 +1000 Subject: [PATCH 38/43] Correct spelling mistake in jetstream pull consumer --- async-nats/src/jetstream/consumer/pull.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async-nats/src/jetstream/consumer/pull.rs b/async-nats/src/jetstream/consumer/pull.rs index 02f3ac046..c6a711304 100644 --- a/async-nats/src/jetstream/consumer/pull.rs +++ b/async-nats/src/jetstream/consumer/pull.rs @@ -398,7 +398,7 @@ impl futures::Stream for Batch { Poll::Ready(maybe_message) => match maybe_message { Some(message) => match message.status.unwrap_or(StatusCode::OK) { StatusCode::TIMEOUT => { - debug!("recived timeout. Iterator done."); + debug!("received timeout. Iterator done."); self.terminated = true; Poll::Ready(None) } From 9a1cdbdd6f77e655896b805a995921db76599b4c Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Wed, 23 Aug 2023 09:06:21 -0400 Subject: [PATCH 39/43] Add issue forms Signed-off-by: Byron Ruth --- .github/ISSUE_TEMPLATE/blank_issue.md | 5 -- .github/ISSUE_TEMPLATE/bugs.yml | 58 ---------------------- .github/ISSUE_TEMPLATE/config.yml | 9 ++-- .github/ISSUE_TEMPLATE/defect.yml | 41 +++++++++++++++ .github/ISSUE_TEMPLATE/feature_request.yml | 36 -------------- .github/ISSUE_TEMPLATE/proposal.yml | 34 +++++++++++++ 6 files changed, 81 insertions(+), 102 deletions(-) delete mode 100644 .github/ISSUE_TEMPLATE/blank_issue.md delete mode 100644 .github/ISSUE_TEMPLATE/bugs.yml create mode 100644 .github/ISSUE_TEMPLATE/defect.yml delete mode 100644 .github/ISSUE_TEMPLATE/feature_request.yml create mode 100644 .github/ISSUE_TEMPLATE/proposal.yml diff --git a/.github/ISSUE_TEMPLATE/blank_issue.md b/.github/ISSUE_TEMPLATE/blank_issue.md deleted file mode 100644 index a635f616f..000000000 --- a/.github/ISSUE_TEMPLATE/blank_issue.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -name: Blank Issue -about: Create an issue with a blank template. -labels: 'needs triage' ---- diff --git a/.github/ISSUE_TEMPLATE/bugs.yml b/.github/ISSUE_TEMPLATE/bugs.yml deleted file mode 100644 index efe66dcc8..000000000 --- a/.github/ISSUE_TEMPLATE/bugs.yml +++ /dev/null @@ -1,58 +0,0 @@ -name: Bug Report -description: Report a bug found in the NATS Rust client -labels: ["bug", "needs triage"] -body: - - type: markdown - attributes: - value: | - Make sure you fill all the information in this form and include a [Minimal, Complete, and Verifiable example](https://stackoverflow.com/help/mcve) - - type: input - id: nats_version - attributes: - label: NATS version - description: What is the version of NATS Rust client that you're using? - placeholder: | - Run: grep 'name = "nats"' Cargo.lock -A 1 - validations: - required: true - - type: input - id: rusts_version - attributes: - label: rustc version - description: What is the version of rustc that you're using? - placeholder: | - Run: rustc --version (we support Rust 1.41 and up) - validations: - required: true - - type: input - id: os_container_env - attributes: - label: OS/Container environment - description: What is the OS or container environment you're running the NATS Rust client on? - placeholder: ex. Debian 11.6 - validations: - required: true - - type: textarea - id: steps_to_reproduce - attributes: - label: Steps or code to reproduce the issue - description: How can we reproduce the issue? - placeholder: Your steps/code to reproduce the issue - validations: - requred: true - - type: textarea - id: expected_result - attributes: - label: Expected result - description: What is the expected result? - placeholder: Your expected result - validations: - required: true - - type: textarea - id: actual_result - attributes: - label: Actual result - description: What is the actual result? - placeholder: Your actual result - validations: - required: true diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml index e989d6e87..b98b0060f 100644 --- a/.github/ISSUE_TEMPLATE/config.yml +++ b/.github/ISSUE_TEMPLATE/config.yml @@ -1,5 +1,8 @@ -blank_issues_enabled: true +blank_issues_enabled: false contact_links: - - name: NATS Slack + - name: Discussion + url: https://github.com/nats-io/nats.rs/discussions + about: Ideal for ideas, feedback, or longer form questions. + - name: Chat url: https://slack.nats.io - about: Please ask and answer questions in the rust channel here. + about: Ideal for short, one-off questions, general conversation, and meeting other NATS users! diff --git a/.github/ISSUE_TEMPLATE/defect.yml b/.github/ISSUE_TEMPLATE/defect.yml new file mode 100644 index 000000000..99b4800a8 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/defect.yml @@ -0,0 +1,41 @@ +--- +name: Defect +description: Report a defect, such as a bug or regression. +labels: + - defect +body: + - type: textarea + id: versions + attributes: + label: What version were you using? + description: Include the server version (`nats-server --version`) and any client versions when observing the issue. + validations: + required: true + - type: textarea + id: environment + attributes: + label: What environment was the server running in? + description: This pertains to the operating system, CPU architecture, and/or Docker image that was used. + validations: + required: true + - type: textarea + id: steps + attributes: + label: Is this defect reproducible? + description: Provide best-effort steps to showcase the defect. + validations: + required: true + - type: textarea + id: expected + attributes: + label: Given the capability you are leveraging, describe your expectation? + description: This may be the expected behavior or performance characteristics. + validations: + required: true + - type: textarea + id: actual + attributes: + label: Given the expectation, what is the defect you are observing? + description: This may be an unexpected behavior or regression in performance. + validations: + required: true diff --git a/.github/ISSUE_TEMPLATE/feature_request.yml b/.github/ISSUE_TEMPLATE/feature_request.yml deleted file mode 100644 index a420112df..000000000 --- a/.github/ISSUE_TEMPLATE/feature_request.yml +++ /dev/null @@ -1,36 +0,0 @@ -name: Feature Request -description: Request a feature for the NATS Rust client -labels: ["enhancement", "needs triage"] -body: - - type: textarea - id: use_case - attributes: - label: Use case - description: What is the use case for this feature? - placeholder: Info about the use case with details... - validations: - required: true - - type: textarea - id: proposed_change - attributes: - label: Proposed change - description: What is the change that you propose? - placeholder: Details about the proposed change... - validations: - required: true - - type: input - id: who_benefits - attributes: - label: Who benefits from the change(s)? - description: Who will this be useful to? - placeholder: Beneficiaries... - validations: - required: true - - type: textarea - id: alt_approaches - attributes: - label: Alternative Approaches - description: Are there any alternative approaches? - placeholder: Alt approaches if any... - validations: - required: false diff --git a/.github/ISSUE_TEMPLATE/proposal.yml b/.github/ISSUE_TEMPLATE/proposal.yml new file mode 100644 index 000000000..d7da0ca49 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/proposal.yml @@ -0,0 +1,34 @@ +--- +name: Proposal +description: Propose an enhancement or new feature. +labels: + - proposal +body: + - type: textarea + id: usecase + attributes: + label: What motivated this proposal? + description: Describe the use case justifying this request. + validations: + required: true + - type: textarea + id: change + attributes: + label: What is the proposed change? + description: This could be a behavior change, enhanced API, or a branch new feature. + validations: + required: true + - type: textarea + id: benefits + attributes: + label: Who benefits from this change? + description: Describe how this not only benefits you. + validations: + required: false + - type: textarea + id: alternates + attributes: + label: What alternatives have you evaluated? + description: This could be using existing features or relying on an external dependency. + validations: + required: false From 7cd9be2097ac83a6589215790c6748b9b207d680 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Thu, 31 Aug 2023 09:29:03 +0200 Subject: [PATCH 40/43] Fix nats-server clippy errors Signed-off-by: Tomasz Pietrek --- nats-server/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nats-server/src/lib.rs b/nats-server/src/lib.rs index 03afc0e80..57ba39f2e 100644 --- a/nats-server/src/lib.rs +++ b/nats-server/src/lib.rs @@ -38,7 +38,7 @@ struct Inner { lazy_static! { static ref SD_RE: Regex = Regex::new(r#".+\sStore Directory:\s+"([^"]+)""#).unwrap(); - static ref CLIENT_RE: Regex = Regex::new(r#".+\sclient connections on\s+(\S+)"#).unwrap(); + static ref CLIENT_RE: Regex = Regex::new(r".+\sclient connections on\s+(\S+)").unwrap(); } impl Drop for Server { @@ -179,7 +179,7 @@ impl<'a> IntoConfig<'a> for [&'a str; 3] { pub fn run_cluster<'a, C: IntoConfig<'a>>(cfg: C) -> Cluster { let cfg = cfg.into_config(); let port = rand::thread_rng().gen_range(3000..50_000); - let ports = vec![port, port + 100, port + 200]; + let ports = [port, port + 100, port + 200]; let ports = ports .iter() @@ -191,7 +191,7 @@ pub fn run_cluster<'a, C: IntoConfig<'a>>(cfg: C) -> Cluster { new_port }) .collect::>(); - let cluster = vec![port + 1, port + 101, port + 201]; + let cluster = [port + 1, port + 101, port + 201]; let s1 = run_cluster_node_with_port( cfg.0[0], From a9b65340bf609788f11ef86d92bf1bc2c3e40687 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Thu, 31 Aug 2023 09:44:54 +0200 Subject: [PATCH 41/43] Fix clippy errors in async-nats client Signed-off-by: Tomasz Pietrek --- async-nats/src/client.rs | 2 +- async-nats/src/jetstream/kv/mod.rs | 4 ++-- async-nats/src/jetstream/object_store/mod.rs | 4 ++-- async-nats/src/service/mod.rs | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index 9791c246f..94a15edf9 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -30,7 +30,7 @@ use tokio::sync::{mpsc, oneshot}; use tracing::trace; static VERSION_RE: Lazy = - Lazy::new(|| Regex::new(r#"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?"#).unwrap()); + Lazy::new(|| Regex::new(r"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?").unwrap()); /// An error returned from the [`Client::publish`], [`Client::publish_with_headers`], /// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions. diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index cfb366565..f30db892d 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -67,8 +67,8 @@ fn kv_operation_from_message(message: &Message) -> Result } } -static VALID_BUCKET_RE: Lazy = Lazy::new(|| Regex::new(r#"\A[a-zA-Z0-9_-]+\z"#).unwrap()); -static VALID_KEY_RE: Lazy = Lazy::new(|| Regex::new(r#"\A[-/_=\.a-zA-Z0-9]+\z"#).unwrap()); +static VALID_BUCKET_RE: Lazy = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap()); +static VALID_KEY_RE: Lazy = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap()); pub(crate) const MAX_HISTORY: i64 = 64; const ALL_KEYS: &str = ">"; diff --git a/async-nats/src/jetstream/object_store/mod.rs b/async-nats/src/jetstream/object_store/mod.rs index 1f160e2dd..219bfce0b 100644 --- a/async-nats/src/jetstream/object_store/mod.rs +++ b/async-nats/src/jetstream/object_store/mod.rs @@ -42,8 +42,8 @@ const DEFAULT_CHUNK_SIZE: usize = 128 * 1024; const NATS_ROLLUP: &str = "Nats-Rollup"; const ROLLUP_SUBJECT: &str = "sub"; -static BUCKET_NAME_RE: Lazy = Lazy::new(|| Regex::new(r#"\A[a-zA-Z0-9_-]+\z"#).unwrap()); -static OBJECT_NAME_RE: Lazy = Lazy::new(|| Regex::new(r#"\A[-/_=\.a-zA-Z0-9]+\z"#).unwrap()); +static BUCKET_NAME_RE: Lazy = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap()); +static OBJECT_NAME_RE: Lazy = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap()); pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool { BUCKET_NAME_RE.is_match(bucket_name) diff --git a/async-nats/src/service/mod.rs b/async-nats/src/service/mod.rs index 6cc604861..31f3b7431 100644 --- a/async-nats/src/service/mod.rs +++ b/async-nats/src/service/mod.rs @@ -48,11 +48,11 @@ pub const NATS_SERVICE_ERROR_CODE: &str = "Nats-Service-Error-Code"; // uses recommended semver validation expression from // https://semver.org/#is-there-a-suggested-regular-expression-regex-to-check-a-semver-string static SEMVER: Lazy = Lazy::new(|| { - Regex::new(r#"^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$"#) + Regex::new(r"^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$") .unwrap() }); // From ADR-33: Name can only have A-Z, a-z, 0-9, dash, underscore. -static NAME: Lazy = Lazy::new(|| Regex::new(r#"^[A-Za-z0-9\-_]+$"#).unwrap()); +static NAME: Lazy = Lazy::new(|| Regex::new(r"^[A-Za-z0-9\-_]+$").unwrap()); /// Represents state for all endpoints. #[derive(Debug, Clone, Serialize, Deserialize)] From 76e2a02d47cf955ceb348e99264b9642c977fa32 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Thu, 31 Aug 2023 09:46:08 +0200 Subject: [PATCH 42/43] Fix clippy errors in nats client Signed-off-by: Tomasz Pietrek --- nats/src/jetstream/push_subscription.rs | 4 ++-- nats/src/subscription.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/nats/src/jetstream/push_subscription.rs b/nats/src/jetstream/push_subscription.rs index 5b8ede2f5..9f36b6640 100644 --- a/nats/src/jetstream/push_subscription.rs +++ b/nats/src/jetstream/push_subscription.rs @@ -342,7 +342,7 @@ impl PushSubscription { self.0.stream, self.0.consumer, )) .spawn(move || { - for m in sub.iter() { + for m in &sub { if let Err(e) = handler(m) { // TODO(dlc) - Capture for last error? log::error!("Error in callback! {:?}", e); @@ -394,7 +394,7 @@ impl PushSubscription { self.0.consumer, self.0.stream )) .spawn(move || { - for message in sub.iter() { + for message in &sub { if let Err(err) = handler(&message) { log::error!("Error in callback! {:?}", err); } diff --git a/nats/src/subscription.rs b/nats/src/subscription.rs index c650fcc5d..be8bb3f32 100644 --- a/nats/src/subscription.rs +++ b/nats/src/subscription.rs @@ -246,7 +246,7 @@ impl Subscription { thread::Builder::new() .name(format!("nats_subscriber_{}_{}", self.0.sid, self.0.subject)) .spawn(move || { - for m in sub.iter() { + for m in &sub { if let Err(e) = handler(m) { // TODO(dlc) - Capture for last error? log::error!("Error in callback! {:?}", e); From 8661572ccd8a2aece34a70061b2b462604a7a621 Mon Sep 17 00:00:00 2001 From: Sravan S Date: Thu, 31 Aug 2023 17:56:44 +0900 Subject: [PATCH 43/43] Fix typo in doc for ping_interval Issue: ping_interval example was using flush_interval Also, Use duration::from_secs --- async-nats/src/options.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async-nats/src/options.rs b/async-nats/src/options.rs index fec11816b..93d7629ef 100644 --- a/async-nats/src/options.rs +++ b/async-nats/src/options.rs @@ -597,7 +597,7 @@ impl ConnectOptions { /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::ConnectError> { /// async_nats::ConnectOptions::new() - /// .flush_interval(Duration::from_millis(100)) + /// .ping_interval(Duration::from_secs(24)) /// .connect("demo.nats.io") /// .await?; /// # Ok(())