diff --git a/async-nats/CHANGELOG.md b/async-nats/CHANGELOG.md index 52d56983f..7b035ac6c 100644 --- a/async-nats/CHANGELOG.md +++ b/async-nats/CHANGELOG.md @@ -1,3 +1,28 @@ +# 0.31.0 +This release focuses on improvements of heartbeats in JetStream Consumers. + +Heartbeats are a tool that tells the user if the given consumer is healthy but does not get any messages or if the reason for no message is an actual problem. +However, if the user was not polling the `Stream` future for the next messages for a long time (because it was slowly processing messages), that could trigger idle heartbeats, as the library could not see the heartbeat messages without messages being polled. + +This release fixes it by starting the idle heartbeat timer only after Stream future is polled (which usually means calling `messages.next().await`). + +## What's Changed +* Fix unwrap from `HeaderName::from_str` call by @caspervonb in https://github.com/nats-io/nats.rs/pull/1032 +* Use idiomatic method for writing `Option` and accessing inner `T` by @paolobarbolini in https://github.com/nats-io/nats.rs/pull/1034 +* Add missing sequence number reset by @paolobarbolini in https://github.com/nats-io/nats.rs/pull/1035 +* Fix header name range validation by @caspervonb in https://github.com/nats-io/nats.rs/pull/1031 +* Simplify consumer checking logic by @paolobarbolini in https://github.com/nats-io/nats.rs/pull/1033 +* Fix millis -> nanos typo in `BatchConfig` `expiration` by @paolobarbolini in https://github.com/nats-io/nats.rs/pull/1037 +* Fix kv purge with prefix (thanks @brooksmtownsend for reporting it!) by @Jarema in https://github.com/nats-io/nats.rs/pull/1055 +* Remove memcpy in object store PUT by @paolobarbolini in https://github.com/nats-io/nats.rs/pull/1039 +* Drop subscription on list done by @Jarema in https://github.com/nats-io/nats.rs/pull/1041 +* Improve push consumer handling when encountering slow consumers by @Jarema in https://github.com/nats-io/nats.rs/pull/1044 +* Rework idle heartbeat for pull consumers by @Jarema in https://github.com/nats-io/nats.rs/pull/1046 +* Rework push consumer heartbeats handling by @Jarema in https://github.com/nats-io/nats.rs/pull/1048 + + +**Full Changelog**: https://github.com/nats-io/nats.rs/compare/async-nats/v0.30.0...async-nats/v0.30.1 + # 0.30.0 ## Overview This is a big release that introduces almost all breaking changes and API refinements before 1.0.0. diff --git a/async-nats/Cargo.toml b/async-nats/Cargo.toml index f8289c64e..8e55cd322 100644 --- a/async-nats/Cargo.toml +++ b/async-nats/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "async-nats" authors = ["Tomasz Pietrek ", "Casper Beyer "] -version = "0.30.0" +version = "0.31.0" edition = "2021" rust = "1.64.0" description = "A async Rust NATS client" @@ -29,7 +29,7 @@ itoa = "1" url = { version = "2"} tokio-rustls = "0.24" rustls-pemfile = "1.0.2" -nuid = "0.3.2" +nuid = "0.4.1" serde_nanos = "0.1.3" time = { version = "0.3.20", features = ["parsing", "formatting", "serde", "serde-well-known"] } rustls-native-certs = "0.6" @@ -42,7 +42,7 @@ rand = "0.8" webpki = { package = "rustls-webpki", version = "0.101.1", features = ["alloc", "std"] } [dev-dependencies] -criterion = { version = "0.3", features = ["async_tokio"]} +criterion = { version = "0.5", features = ["async_tokio"]} nats-server = { path = "../nats-server" } rand = "0.8" tokio = { version = "1.25.0", features = ["rt-multi-thread"] } diff --git a/async-nats/src/auth_utils.rs b/async-nats/src/auth_utils.rs index 9b8d59c15..a347563a9 100644 --- a/async-nats/src/auth_utils.rs +++ b/async-nats/src/auth_utils.rs @@ -14,12 +14,12 @@ use nkeys::KeyPair; use once_cell::sync::Lazy; use regex::Regex; -use std::{io, path::PathBuf}; +use std::{io, path::Path}; /// Loads user credentials file with jwt and key. Return file contents. /// Uses tokio non-blocking io -pub(crate) async fn load_creds(path: PathBuf) -> io::Result { - tokio::fs::read_to_string(&path).await.map_err(|err| { +pub(crate) async fn load_creds(path: &Path) -> io::Result { + tokio::fs::read_to_string(path).await.map_err(|err| { io::Error::new( io::ErrorKind::Other, format!("loading creds file '{}': {}", path.display(), err), diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index 9be37ad19..882f54e40 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -15,6 +15,7 @@ use crate::connection::State; use crate::ServerInfo; use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber}; +use crate::error::Error; use bytes::Bytes; use futures::future::TryFutureExt; use futures::stream::StreamExt; @@ -609,7 +610,7 @@ impl From> for SubscribeError { } } -#[derive(Debug, PartialEq, Copy, Clone)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum RequestErrorKind { /// There are services listening on requested subject, but they didn't respond /// in time. @@ -620,51 +621,33 @@ pub enum RequestErrorKind { Other, } -/// Error returned when a core NATS request fails. -/// To be enumerate over the variants, call [RequestError::kind]. -#[derive(Debug)] -pub struct RequestError { - kind: RequestErrorKind, - source: Option, -} - -crate::error_impls!(RequestError, RequestErrorKind); - -impl Display for RequestError { +impl Display for RequestErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.kind { - RequestErrorKind::TimedOut => write!(f, "request timed out"), - RequestErrorKind::NoResponders => write!(f, "no responders"), - RequestErrorKind::Other => write!(f, "request failed: {:?}", self.kind), + match self { + Self::TimedOut => write!(f, "request timed out"), + Self::NoResponders => write!(f, "no responders"), + Self::Other => write!(f, "request failed"), } } } -/// Error returned when flushing the messages buffered on the client fails. -/// To be enumerate over the variants, call [FlushError::kind]. -#[derive(Debug)] -pub struct FlushError { - kind: FlushErrorKind, - source: Option, -} +/// Error returned when a core NATS request fails. +/// To be enumerate over the variants, call [RequestError::kind]. +pub type RequestError = Error; -crate::error_impls!(FlushError, FlushErrorKind); +impl From for RequestError { + fn from(e: PublishError) -> Self { + RequestError::with_source(RequestErrorKind::Other, e) + } +} -impl Display for FlushError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let source_info = self - .source - .as_ref() - .map(|e| e.to_string()) - .unwrap_or_else(|| "no details".into()); - match self.kind { - FlushErrorKind::SendError => write!(f, "failed to send flush request: {}", source_info), - FlushErrorKind::FlushError => write!(f, "flush failed: {}", source_info), - } +impl From for RequestError { + fn from(e: SubscribeError) -> Self { + RequestError::with_source(RequestErrorKind::Other, e) } } -#[derive(Debug, PartialEq, Clone, Copy)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum FlushErrorKind { /// Sending the flush failed client side. SendError, @@ -674,13 +657,13 @@ pub enum FlushErrorKind { FlushError, } -impl From for RequestError { - fn from(e: PublishError) -> Self { - RequestError::with_source(RequestErrorKind::Other, e) - } -} -impl From for RequestError { - fn from(e: SubscribeError) -> Self { - RequestError::with_source(RequestErrorKind::Other, e) +impl Display for FlushErrorKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::SendError => write!(f, "failed to send flush request"), + Self::FlushError => write!(f, "flush failed"), + } } } + +pub type FlushError = Error; diff --git a/async-nats/src/connection.rs b/async-nats/src/connection.rs index 6b430d91f..e1104d781 100644 --- a/async-nats/src/connection.rs +++ b/async-nats/src/connection.rs @@ -62,12 +62,10 @@ impl Connection { /// Attempts to read a server operation from the read buffer. /// Returns `None` if there is not enough data to parse an entire operation. pub(crate) fn try_read_op(&mut self) -> Result, io::Error> { - let maybe_len = memchr::memmem::find(&self.buffer, b"\r\n"); - if maybe_len.is_none() { - return Ok(None); - } - - let len = maybe_len.unwrap(); + let len = match memchr::memmem::find(&self.buffer, b"\r\n") { + Some(len) => len, + None => return Ok(None), + }; if self.buffer.starts_with(b"+OK") { self.buffer.advance(len + 2); @@ -88,7 +86,7 @@ impl Connection { let description = str::from_utf8(&self.buffer[5..len]) .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))? .trim_matches('\'') - .to_string(); + .to_owned(); self.buffer.advance(len + 2); @@ -109,27 +107,34 @@ impl Connection { let mut args = line.split(' ').filter(|s| !s.is_empty()); // Parse the operation syntax: MSG [reply-to] <#bytes> - let subject = args.next(); - let sid = args.next(); - let mut reply_to = args.next(); - let mut payload_len = args.next(); - if payload_len.is_none() { - std::mem::swap(&mut reply_to, &mut payload_len); - } - - if subject.is_none() || sid.is_none() || payload_len.is_none() || args.next().is_some() - { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "invalid number of arguments after MSG", - )); - } + let (subject, sid, reply_to, payload_len) = match ( + args.next(), + args.next(), + args.next(), + args.next(), + args.next(), + ) { + (Some(subject), Some(sid), Some(reply_to), Some(payload_len), None) => { + (subject, sid, Some(reply_to), payload_len) + } + (Some(subject), Some(sid), Some(payload_len), None, None) => { + (subject, sid, None, payload_len) + } + _ => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid number of arguments after MSG", + )) + } + }; - let sid = u64::from_str(sid.unwrap()) + let sid = sid + .parse::() .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?; // Parse the number of payload bytes. - let payload_len = usize::from_str(payload_len.unwrap()) + let payload_len = payload_len + .parse::() .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?; // Return early without advancing if there is not enough data read the entire @@ -138,18 +143,19 @@ impl Connection { return Ok(None); } - let subject = subject.unwrap().to_owned(); - let reply_to = reply_to.map(String::from); + let subject = subject.to_owned(); + let reply_to = reply_to.map(ToOwned::to_owned); self.buffer.advance(len + 2); let payload = self.buffer.split_to(payload_len).freeze(); self.buffer.advance(2); + let length = payload_len + + reply_to.as_ref().map(|reply| reply.len()).unwrap_or(0) + + subject.len(); return Ok(Some(ServerOp::Message { sid, - length: payload_len - + reply_to.as_ref().map(|reply| reply.len()).unwrap_or(0) - + subject.len(), + length, reply: reply_to, headers: None, subject, @@ -165,33 +171,38 @@ impl Connection { let mut args = line.split_whitespace().filter(|s| !s.is_empty()); // [reply-to] <# header bytes><# total bytes> - let subject = args.next(); - let sid = args.next(); - let mut reply_to = args.next(); - let mut num_header_bytes = args.next(); - let mut num_bytes = args.next(); - if num_bytes.is_none() { - std::mem::swap(&mut num_header_bytes, &mut num_bytes); - std::mem::swap(&mut reply_to, &mut num_header_bytes); - } - - if subject.is_none() - || sid.is_none() - || num_header_bytes.is_none() - || num_bytes.is_none() - || args.next().is_some() - { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "invalid number of arguments after HMSG", - )); - } + let (subject, sid, reply_to, header_len, total_len) = match ( + args.next(), + args.next(), + args.next(), + args.next(), + args.next(), + args.next(), + ) { + ( + Some(subject), + Some(sid), + Some(reply_to), + Some(header_len), + Some(total_len), + None, + ) => (subject, sid, Some(reply_to), header_len, total_len), + (Some(subject), Some(sid), Some(header_len), Some(total_len), None, None) => { + (subject, sid, None, header_len, total_len) + } + _ => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid number of arguments after HMSG", + )) + } + }; // Convert the slice into an owned string. - let subject = subject.unwrap().to_string(); + let subject = subject.to_owned(); // Parse the subject ID. - let sid = u64::from_str(sid.unwrap()).map_err(|_| { + let sid = sid.parse::().map_err(|_| { io::Error::new( io::ErrorKind::InvalidInput, "cannot parse sid argument after HMSG", @@ -199,10 +210,10 @@ impl Connection { })?; // Convert the slice into an owned string. - let reply_to = reply_to.map(ToString::to_string); + let reply_to = reply_to.map(ToOwned::to_owned); // Parse the number of payload bytes. - let num_header_bytes = usize::from_str(num_header_bytes.unwrap()).map_err(|_| { + let header_len = header_len.parse::().map_err(|_| { io::Error::new( io::ErrorKind::InvalidInput, "cannot parse the number of header bytes argument after \ @@ -211,14 +222,14 @@ impl Connection { })?; // Parse the number of payload bytes. - let num_bytes = usize::from_str(num_bytes.unwrap()).map_err(|_| { + let total_len = total_len.parse::().map_err(|_| { io::Error::new( io::ErrorKind::InvalidInput, "cannot parse the number of bytes argument after HMSG", ) })?; - if num_bytes < num_header_bytes { + if total_len < header_len { return Err(io::Error::new( io::ErrorKind::InvalidInput, "number of header bytes was greater than or equal to the \ @@ -226,56 +237,51 @@ impl Connection { )); } - if len + num_bytes + 4 > self.buffer.remaining() { + if len + total_len + 4 > self.buffer.remaining() { return Ok(None); } self.buffer.advance(len + 2); - let buffer = self.buffer.split_to(num_header_bytes).freeze(); - let payload = self.buffer.split_to(num_bytes - num_header_bytes).freeze(); + let header = self.buffer.split_to(header_len); + let payload = self.buffer.split_to(total_len - header_len).freeze(); self.buffer.advance(2); - let mut lines = std::str::from_utf8(&buffer).unwrap().lines().peekable(); + let mut lines = std::str::from_utf8(&header) + .map_err(|_| { + io::Error::new(io::ErrorKind::InvalidInput, "header isn't valid utf-8") + })? + .lines() + .peekable(); let version_line = lines.next().ok_or_else(|| { io::Error::new(io::ErrorKind::InvalidInput, "no header version line found") })?; - if !version_line.starts_with("NATS/1.0") { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "header version line does not begin with nats/1.0", - )); - } + let version_line_suffix = version_line + .strip_prefix("NATS/1.0") + .map(str::trim) + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "header version line does not begin with `NATS/1.0`", + ) + })?; - let mut maybe_status: Option = None; - let mut maybe_description: Option = None; - if let Some(slice) = version_line.get("NATS/1.0".len()..).map(|s| s.trim()) { - match slice.split_once(' ') { - Some((status, description)) => { - if !status.is_empty() { - maybe_status = Some(status.trim().parse().map_err(|_| { - std::io::Error::new( - io::ErrorKind::Other, - "could not covert Description header into header value", - ) - })?); - } - if !description.is_empty() { - maybe_description = Some(description.trim().to_string()); - } - } - None => { - if !slice.is_empty() { - maybe_status = Some(slice.trim().parse().map_err(|_| { - std::io::Error::new( - io::ErrorKind::Other, - "could not covert Description header into header value", - ) - })?); - } - } - } - } + let (status, description) = version_line_suffix + .split_once(' ') + .map(|(status, description)| (status.trim(), description.trim())) + .unwrap_or((version_line_suffix, "")); + let status = if !status.is_empty() { + Some(status.parse::().map_err(|_| { + std::io::Error::new(io::ErrorKind::Other, "could not parse status parameter") + })?) + } else { + None + }; + let description = if !description.is_empty() { + Some(description.to_owned()) + } else { + None + }; let mut headers = HeaderMap::new(); while let Some(line) = lines.next() { @@ -283,32 +289,35 @@ impl Connection { continue; } - let (key, value) = line.split_once(':').ok_or_else(|| { + let (name, value) = line.split_once(':').ok_or_else(|| { io::Error::new(io::ErrorKind::InvalidInput, "no header version line found") })?; - let mut value = value.to_owned(); + let name = HeaderName::from_str(name) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?; + + // Read the header value, which might have been split into multiple lines + // `trim_start` and `trim_end` do the same job as doing `value.trim().to_owned()` at the end, but without a reallocation + let mut value = value.trim_start().to_owned(); while let Some(v) = lines.next_if(|s| s.starts_with(char::is_whitespace)) { value.push_str(v); } + value.truncate(value.trim_end().len()); - let name = HeaderName::from_str(key) - .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?; - - headers.append(name, value.trim().to_string()); + headers.append(name, value); } return Ok(Some(ServerOp::Message { - length: reply_to.as_ref().map(|reply| reply.len()).unwrap_or(0) + length: reply_to.as_ref().map_or(0, |reply| reply.len()) + subject.len() - + num_bytes, + + total_len, sid, reply: reply_to, subject, headers: Some(headers), payload, - status: maybe_status, - description: maybe_description, + status, + description, })); } diff --git a/async-nats/src/error/mod.rs b/async-nats/src/error/mod.rs new file mode 100644 index 000000000..0bbf12e14 --- /dev/null +++ b/async-nats/src/error/mod.rs @@ -0,0 +1,160 @@ +use std::fmt::{Debug, Display}; + +/// The error type for the NATS client, generic by the kind of error. +#[derive(Debug)] +pub struct Error +where + Kind: Clone + Debug + Display + PartialEq, +{ + pub(crate) kind: Kind, + pub(crate) source: Option, +} + +impl Error +where + Kind: Clone + Debug + Display + PartialEq, +{ + pub(crate) fn new(kind: Kind) -> Self { + Self { kind, source: None } + } + + pub(crate) fn with_source(kind: Kind, source: S) -> Self + where + S: Into, + { + Self { + kind, + source: Some(source.into()), + } + } + + // In some cases the kind doesn't implement `Copy` trait + pub fn kind(&self) -> Kind { + self.kind.clone() + } +} + +impl Display for Error +where + Kind: Clone + Debug + Display + PartialEq, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(err) = &self.source { + write!(f, "{}: {}", self.kind, err) + } else { + write!(f, "{}", self.kind) + } + } +} + +impl std::error::Error for Error where Kind: Clone + Debug + Display + PartialEq {} + +impl From for Error +where + Kind: Clone + Debug + Display + PartialEq, +{ + fn from(kind: Kind) -> Self { + Self { kind, source: None } + } +} + +/// Enables wrapping source errors to the crate-specific error type +/// by additionally specifying the kind of the target error. +trait WithKind +where + Kind: Clone + Debug + Display + PartialEq, + Self: Into, +{ + fn with_kind(self, kind: Kind) -> Error { + Error:: { + kind, + source: Some(self.into()), + } + } +} + +impl WithKind for E +where + Kind: Clone + Debug + Display + PartialEq, + E: Into, +{ +} + +#[cfg(test)] +mod test { + #![allow(dead_code)] + + use super::*; + use std::fmt::Formatter; + + // Define a custom error kind as a public enum + #[derive(Clone, Debug, PartialEq)] + enum FooErrorKind { + Bar, + Baz, + } + + impl Display for FooErrorKind { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Bar => write!(f, "bar error"), + Self::Baz => write!(f, "baz error"), + } + } + } + + // Define a custom error type as a public struct + type FooError = Error; + + #[test] + fn new() { + let error = FooError::new(FooErrorKind::Bar); + assert_eq!(error.kind, FooErrorKind::Bar); + assert!(error.source.is_none()); + } + + #[test] + fn with_source() { + let source = std::io::Error::new(std::io::ErrorKind::Other, "foo"); + let error = FooError::with_source(FooErrorKind::Bar, source); + assert_eq!(error.kind, FooErrorKind::Bar); + assert_eq!(error.source.unwrap().to_string(), "foo"); + } + + #[test] + fn kind() { + let error: FooError = FooErrorKind::Bar.into(); + let kind = error.kind(); + // ensure the kind can be invoked multiple times even though Copy is not implemented + let _ = error.kind(); + assert_eq!(kind, FooErrorKind::Bar); + } + + #[test] + fn display_with_source() { + let source = std::io::Error::new(std::io::ErrorKind::Other, "foo"); + let error = source.with_kind(FooErrorKind::Bar); + assert_eq!(format!("{}", error), "bar error: foo"); + } + + #[test] + fn display_without_source() { + let error: FooError = FooErrorKind::Bar.into(); + assert_eq!(format!("{}", error), "bar error"); + } + + #[test] + fn from() { + let error: FooError = FooErrorKind::Bar.into(); + assert_eq!(error.kind, FooErrorKind::Bar); + assert!(error.source.is_none()); + } + + #[test] + fn with_kind() { + let source = std::io::Error::new(std::io::ErrorKind::Other, "foo"); + let error: FooError = source.with_kind(FooErrorKind::Baz); + assert_eq!(error.kind(), FooErrorKind::Baz); + assert_eq!(format!("{}", error), "baz error: foo"); + } +} diff --git a/async-nats/src/header.rs b/async-nats/src/header.rs index 2dc753492..d0aa63906 100644 --- a/async-nats/src/header.rs +++ b/async-nats/src/header.rs @@ -404,7 +404,7 @@ standard_headers! { /// The last known sequence number of the message. (NatsLastSequence, NATS_LAST_SEQUENCE, b"Nats-Last-Sequence"); /// The expected last sequence number of the subject. - (NatsExpectgedLastSubjectSequence, NATS_EXPECTED_LAST_SUBJECT_SEQUENCE, b"Nats-Expected-Last-Subject-Sequence"); + (NatsExpectedLastSubjectSequence, NATS_EXPECTED_LAST_SUBJECT_SEQUENCE, b"Nats-Expected-Last-Subject-Sequence"); /// The expected last message ID within the stream. (NatsExpectedLastMessageId, NATS_EXPECTED_LAST_MESSAGE_ID, b"Nats-Expected-Last-Msg-Id"); /// The expected last sequence number within the stream. diff --git a/async-nats/src/jetstream/consumer/mod.rs b/async-nats/src/jetstream/consumer/mod.rs index 313e424f8..dc2ed42a2 100644 --- a/async-nats/src/jetstream/consumer/mod.rs +++ b/async-nats/src/jetstream/consumer/mod.rs @@ -26,8 +26,8 @@ use time::serde::rfc3339; use super::context::RequestError; use super::stream::ClusterInfo; use super::Context; +use crate::error::Error; use crate::jetstream::consumer; -use crate::Error; pub trait IntoConsumerConfig { fn into_consumer_config(self) -> Config; @@ -119,7 +119,9 @@ impl Consumer { /// [Push][crate::jetstream::consumer::push::Config] config. It validates if given config is /// a valid target one. pub trait FromConsumer { - fn try_from_consumer_config(config: crate::jetstream::consumer::Config) -> Result + fn try_from_consumer_config( + config: crate::jetstream::consumer::Config, + ) -> Result where Self: Sized; } @@ -343,7 +345,7 @@ impl IntoConsumerConfig for &Config { } impl FromConsumer for Config { - fn try_from_consumer_config(config: Config) -> Result + fn try_from_consumer_config(config: Config) -> Result where Self: Sized, { @@ -426,24 +428,19 @@ fn is_default(t: &T) -> bool { t == &T::default() } -#[derive(Debug)] -pub struct StreamError { - kind: StreamErrorKind, - source: Option, +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum StreamErrorKind { + TimedOut, + Other, } -crate::error_impls!(StreamError, StreamErrorKind); -impl std::fmt::Display for StreamError { +impl std::fmt::Display for StreamErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match &self.kind() { - StreamErrorKind::TimedOut => write!(f, "timed out"), - StreamErrorKind::Other => write!(f, "failed: {}", self.format_source()), + match self { + Self::TimedOut => write!(f, "timed out"), + Self::Other => write!(f, "failed"), } } } -#[derive(Debug, PartialEq, Clone)] -pub enum StreamErrorKind { - TimedOut, - Other, -} +pub type StreamError = Error; diff --git a/async-nats/src/jetstream/consumer/pull.rs b/async-nats/src/jetstream/consumer/pull.rs index f6cd42a38..e444ff2e7 100644 --- a/async-nats/src/jetstream/consumer/pull.rs +++ b/async-nats/src/jetstream/consumer/pull.rs @@ -12,7 +12,10 @@ // limitations under the License. use bytes::Bytes; -use futures::{future::BoxFuture, FutureExt, StreamExt, TryFutureExt}; +use futures::{ + future::{BoxFuture, Either}, + FutureExt, StreamExt, TryFutureExt, +}; #[cfg(feature = "server_2_10")] use std::collections::HashMap; @@ -24,8 +27,9 @@ use tracing::{debug, trace}; use crate::{ connection::State, + error::Error, jetstream::{self, Context}, - Error, StatusCode, SubscribeError, Subscriber, + StatusCode, SubscribeError, Subscriber, }; use super::{ @@ -82,7 +86,7 @@ impl Consumer { Stream::stream( BatchConfig { batch: 200, - expires: Some(Duration::from_secs(30).as_nanos().try_into().unwrap()), + expires: Some(Duration::from_secs(30)), no_wait: false, max_bytes: 0, idle_heartbeat: Duration::from_secs(15), @@ -315,7 +319,7 @@ impl Consumer { let request = serde_json::to_vec(&BatchConfig { batch, - expires: Some(Duration::from_secs(60).as_nanos().try_into().unwrap()), + expires: Some(Duration::from_secs(60)), ..Default::default() }) .map(Bytes::from) @@ -345,9 +349,9 @@ impl<'a> Batch { let subscription = consumer.context.client.subscribe(inbox.clone()).await?; consumer.request_batch(batch, inbox.clone()).await?; - let sleep = batch.expires.map(|e| { + let sleep = batch.expires.map(|expires| { Box::pin(tokio::time::sleep( - Duration::from_nanos(e).saturating_add(Duration::from_secs(5)), + expires.saturating_add(Duration::from_secs(5)), )) }); @@ -362,7 +366,7 @@ impl<'a> Batch { } impl futures::Stream for Batch { - type Item = Result; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -547,7 +551,7 @@ impl<'a> Consumer { let stream = Stream::stream( BatchConfig { batch: 500, - expires: Some(Duration::from_secs(30).as_nanos().try_into().unwrap()), + expires: Some(Duration::from_secs(30)), no_wait: false, max_bytes: 0, idle_heartbeat: Duration::from_secs(15), @@ -659,7 +663,9 @@ impl From for Config { } impl FromConsumer for OrderedConfig { - fn try_from_consumer_config(config: crate::jetstream::consumer::Config) -> Result + fn try_from_consumer_config( + config: crate::jetstream::consumer::Config, + ) -> Result where Self: Sized, { @@ -864,13 +870,16 @@ impl Stream { // this is just in edge case of missing response for some reason. let expires = batch_config .expires - .map(|expires| match expires { - 0 => futures::future::Either::Left(future::pending()), - t => futures::future::Either::Right(tokio::time::sleep( - Duration::from_nanos(t).saturating_add(Duration::from_secs(5)), - )), + .map(|expires| { + if expires.is_zero() { + Either::Left(future::pending()) + } else { + Either::Right(tokio::time::sleep( + expires.saturating_add(Duration::from_secs(5)), + )) + } }) - .unwrap_or_else(|| futures::future::Either::Left(future::pending())); + .unwrap_or_else(|| Either::Left(future::pending())); // Need to check previous state, as `changed` will always fire on first // call. let prev_state = context.client.state.borrow().to_owned(); @@ -940,28 +949,31 @@ impl Stream { }) } } -#[derive(Debug)] -pub struct OrderedError { - kind: OrderedErrorKind, - source: Option, + +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum OrderedErrorKind { + MissingHeartbeat, + ConsumerDeleted, + Pull, + PushBasedConsumer, + Recreate, + Other, } -impl std::fmt::Display for OrderedError { +impl std::fmt::Display for OrderedErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match &self.kind() { - OrderedErrorKind::MissingHeartbeat => write!(f, "missed idle heartbeat"), - OrderedErrorKind::ConsumerDeleted => write!(f, "consumer deleted"), - OrderedErrorKind::Pull => { - write!(f, "pull request failed: {}", self.format_source()) - } - OrderedErrorKind::Other => write!(f, "error: {}", self.format_source()), - OrderedErrorKind::PushBasedConsumer => write!(f, "cannot use with push consumer"), - OrderedErrorKind::Recreate => write!(f, "consumer recreation failed"), + match self { + Self::MissingHeartbeat => write!(f, "missed idle heartbeat"), + Self::ConsumerDeleted => write!(f, "consumer deleted"), + Self::Pull => write!(f, "pull request failed"), + Self::Other => write!(f, "error"), + Self::PushBasedConsumer => write!(f, "cannot use with push consumer"), + Self::Recreate => write!(f, "consumer recreation failed"), } } } -crate::error_impls!(OrderedError, OrderedErrorKind); +pub type OrderedError = Error; impl From for OrderedError { fn from(err: MessagesError) -> Self { @@ -987,46 +999,28 @@ impl From for OrderedError { } } -#[derive(Debug, Clone, PartialEq)] -pub enum OrderedErrorKind { +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum MessagesErrorKind { MissingHeartbeat, ConsumerDeleted, Pull, PushBasedConsumer, - Recreate, Other, } -#[derive(Debug)] -pub struct MessagesError { - kind: MessagesErrorKind, - source: Option, -} - -impl std::fmt::Display for MessagesError { +impl std::fmt::Display for MessagesErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match &self.kind() { - MessagesErrorKind::MissingHeartbeat => write!(f, "missed idle heartbeat"), - MessagesErrorKind::ConsumerDeleted => write!(f, "consumer deleted"), - MessagesErrorKind::Pull => { - write!(f, "pull request failed: {}", self.format_source()) - } - MessagesErrorKind::Other => write!(f, "error: {}", self.format_source()), - MessagesErrorKind::PushBasedConsumer => write!(f, "cannot use with push consumer"), + match self { + Self::MissingHeartbeat => write!(f, "missed idle heartbeat"), + Self::ConsumerDeleted => write!(f, "consumer deleted"), + Self::Pull => write!(f, "pull request failed"), + Self::Other => write!(f, "error"), + Self::PushBasedConsumer => write!(f, "cannot use with push consumer"), } } } -crate::error_impls!(MessagesError, MessagesErrorKind); - -#[derive(Debug, Clone, PartialEq)] -pub enum MessagesErrorKind { - MissingHeartbeat, - ConsumerDeleted, - Pull, - PushBasedConsumer, - Other, -} +pub type MessagesError = Error; impl futures::Stream for Stream { type Item = Result; @@ -1222,7 +1216,7 @@ pub struct StreamBuilder<'a> { batch: usize, max_bytes: usize, heartbeat: Duration, - expires: u64, + expires: Duration, consumer: &'a Consumer, } @@ -1232,7 +1226,7 @@ impl<'a> StreamBuilder<'a> { consumer, batch: 200, max_bytes: 0, - expires: Duration::from_secs(30).as_nanos().try_into().unwrap(), + expires: Duration::from_secs(30), heartbeat: Duration::default(), } } @@ -1394,7 +1388,7 @@ impl<'a> StreamBuilder<'a> { /// # } /// ``` pub fn expires(mut self, expires: Duration) -> Self { - self.expires = expires.as_nanos().try_into().unwrap(); + self.expires = expires; self } @@ -1482,7 +1476,7 @@ pub struct FetchBuilder<'a> { batch: usize, max_bytes: usize, heartbeat: Duration, - expires: Option, + expires: Option, consumer: &'a Consumer, } @@ -1645,7 +1639,7 @@ impl<'a> FetchBuilder<'a> { /// # } /// ``` pub fn expires(mut self, expires: Duration) -> Self { - self.expires = Some(expires.as_nanos().try_into().unwrap()); + self.expires = Some(expires); self } @@ -1729,7 +1723,7 @@ pub struct BatchBuilder<'a> { batch: usize, max_bytes: usize, heartbeat: Duration, - expires: u64, + expires: Duration, consumer: &'a Consumer, } @@ -1739,7 +1733,7 @@ impl<'a> BatchBuilder<'a> { consumer, batch: 200, max_bytes: 0, - expires: 0, + expires: Duration::ZERO, heartbeat: Duration::default(), } } @@ -1893,7 +1887,7 @@ impl<'a> BatchBuilder<'a> { /// # } /// ``` pub fn expires(mut self, expires: Duration) -> Self { - self.expires = expires.as_nanos().try_into().unwrap(); + self.expires = expires; self } @@ -1947,8 +1941,8 @@ pub struct BatchConfig { pub batch: usize, /// The optional number of nanoseconds that the server will store this next request for /// before forgetting about the pending batch size. - #[serde(skip_serializing_if = "Option::is_none")] - pub expires: Option, + #[serde(skip_serializing_if = "Option::is_none", with = "serde_nanos")] + pub expires: Option, /// This optionally causes the server not to store this pending request at all, but when there are no /// messages to deliver will send a nil bytes message with a Status header of 404, this way you /// can know when you reached the end of the stream for example. A 409 is returned if the @@ -2102,7 +2096,7 @@ impl IntoConsumerConfig for Config { } } impl FromConsumer for Config { - fn try_from_consumer_config(config: consumer::Config) -> Result { + fn try_from_consumer_config(config: consumer::Config) -> Result { if config.deliver_subject.is_some() { return Err(Box::new(std::io::Error::new( std::io::ErrorKind::Other, @@ -2139,60 +2133,46 @@ impl FromConsumer for Config { } } -#[derive(Debug)] -pub struct BatchRequestError { - kind: BatchRequestErrorKind, - source: Option, +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum BatchRequestErrorKind { + Publish, + Flush, + Serialize, } -crate::error_impls!(BatchRequestError, BatchRequestErrorKind); -impl std::fmt::Display for BatchRequestError { +impl std::fmt::Display for BatchRequestErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match &self.kind() { - BatchRequestErrorKind::Publish => { - write!(f, "publish failed: {}", self.format_source()) - } - BatchRequestErrorKind::Flush => { - write!(f, "flush failed: {}", self.format_source()) - } - BatchRequestErrorKind::Serialize => { - write!(f, "serialize failed: {}", self.format_source()) - } + match self { + Self::Publish => write!(f, "publish failed"), + Self::Flush => write!(f, "flush failed"), + Self::Serialize => write!(f, "serialize failed"), } } } -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum BatchRequestErrorKind { - Publish, +pub type BatchRequestError = Error; + +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum BatchErrorKind { + Subscribe, + Pull, Flush, Serialize, } -#[derive(Debug)] -pub struct BatchError { - kind: BatchErrorKind, - source: Option, -} -crate::error_impls!(BatchError, BatchErrorKind); - -impl std::fmt::Display for BatchError { +impl std::fmt::Display for BatchErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match &self.kind() { - BatchErrorKind::Pull => { - write!(f, "pull request failed: {}", self.format_source()) - } - BatchErrorKind::Flush => { - write!(f, "flush failed: {}", self.format_source()) - } - BatchErrorKind::Serialize => { - write!(f, "serialize failed: {}", self.format_source()) - } - BatchErrorKind::Subscribe => write!(f, "subscribe failed: {}", self.format_source()), + match self { + Self::Pull => write!(f, "pull request failed"), + Self::Flush => write!(f, "flush failed"), + Self::Serialize => write!(f, "serialize failed"), + Self::Subscribe => write!(f, "subscribe failed"), } } } +pub type BatchError = Error; + impl From for BatchError { fn from(err: SubscribeError) -> Self { BatchError::with_source(BatchErrorKind::Subscribe, err) @@ -2205,42 +2185,24 @@ impl From for BatchError { } } -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum BatchErrorKind { - Subscribe, - Pull, - Flush, - Serialize, -} - -#[derive(Debug)] -pub struct ConsumerRecreateError { - kind: ConsumerRecreateErrorKind, - source: Option, +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum ConsumerRecreateErrorKind { + GetStream, + Recreate, + TimedOut, } -crate::error_impls!(ConsumerRecreateError, ConsumerRecreateErrorKind); - -impl std::fmt::Display for ConsumerRecreateError { +impl std::fmt::Display for ConsumerRecreateErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match &self.kind() { - ConsumerRecreateErrorKind::GetStream => { - write!(f, "error getting stream: {}", self.format_source()) - } - ConsumerRecreateErrorKind::Recreate => { - write!(f, "consumer creation failed: {}", self.format_source()) - } - ConsumerRecreateErrorKind::TimedOut => write!(f, "timed out"), + match self { + Self::GetStream => write!(f, "error getting stream"), + Self::Recreate => write!(f, "consumer creation failed"), + Self::TimedOut => write!(f, "timed out"), } } } -#[derive(Debug, Clone, PartialEq, Copy)] -pub enum ConsumerRecreateErrorKind { - GetStream, - Recreate, - TimedOut, -} +pub type ConsumerRecreateError = Error; async fn recreate_consumer_stream( context: Context, diff --git a/async-nats/src/jetstream/consumer/push.rs b/async-nats/src/jetstream/consumer/push.rs index c885223be..f406440dc 100644 --- a/async-nats/src/jetstream/consumer/push.rs +++ b/async-nats/src/jetstream/consumer/push.rs @@ -17,8 +17,9 @@ use super::{ }; use crate::{ connection::State, + error::Error, jetstream::{self, Context, Message}, - Error, StatusCode, Subscriber, + StatusCode, Subscriber, }; use bytes::Bytes; @@ -276,7 +277,7 @@ pub struct Config { } impl FromConsumer for Config { - fn try_from_consumer_config(config: super::Config) -> Result { + fn try_from_consumer_config(config: super::Config) -> Result { let deliver_subject = config.deliver_subject.ok_or_else(|| { Box::new(io::Error::new( ErrorKind::Other, @@ -404,7 +405,9 @@ pub struct OrderedConfig { } impl FromConsumer for OrderedConfig { - fn try_from_consumer_config(config: crate::jetstream::consumer::Config) -> Result + fn try_from_consumer_config( + config: crate::jetstream::consumer::Config, + ) -> Result where Self: Sized, { @@ -715,25 +718,29 @@ impl<'a> futures::Stream for Ordered<'a> { } } } -#[derive(Debug)] -pub struct OrderedError { - kind: OrderedErrorKind, - source: Option, + +#[derive(Clone, Debug, PartialEq)] +pub enum OrderedErrorKind { + MissingHeartbeat, + ConsumerDeleted, + PullBasedConsumer, + Recreate, + Other, } -impl std::fmt::Display for OrderedError { +impl std::fmt::Display for OrderedErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match &self.kind() { - OrderedErrorKind::MissingHeartbeat => write!(f, "missed idle heartbeat"), - OrderedErrorKind::ConsumerDeleted => write!(f, "consumer deleted"), - OrderedErrorKind::Other => write!(f, "error: {}", self.format_source()), - OrderedErrorKind::PullBasedConsumer => write!(f, "cannot use with push consumer"), - OrderedErrorKind::Recreate => write!(f, "consumer recreation failed"), + match self { + Self::MissingHeartbeat => write!(f, "missed idle heartbeat"), + Self::ConsumerDeleted => write!(f, "consumer deleted"), + Self::Other => write!(f, "error"), + Self::PullBasedConsumer => write!(f, "cannot use with push consumer"), + Self::Recreate => write!(f, "consumer recreation failed"), } } } -crate::error_impls!(OrderedError, OrderedErrorKind); +pub type OrderedError = Error; impl From for OrderedError { fn from(err: MessagesError) -> Self { @@ -755,72 +762,47 @@ impl From for OrderedError { } } -#[derive(Debug, Clone, PartialEq)] -pub enum OrderedErrorKind { +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum MessagesErrorKind { MissingHeartbeat, ConsumerDeleted, PullBasedConsumer, - Recreate, Other, } -#[derive(Debug)] -pub struct MessagesError { - kind: MessagesErrorKind, - source: Option, -} - -impl std::fmt::Display for MessagesError { +impl std::fmt::Display for MessagesErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match &self.kind() { - MessagesErrorKind::MissingHeartbeat => write!(f, "missed idle heartbeat"), - MessagesErrorKind::ConsumerDeleted => write!(f, "consumer deleted"), - MessagesErrorKind::Other => write!(f, "error: {}", self.format_source()), - MessagesErrorKind::PullBasedConsumer => write!(f, "cannot use with pull consumer"), + match self { + Self::MissingHeartbeat => write!(f, "missed idle heartbeat"), + Self::ConsumerDeleted => write!(f, "consumer deleted"), + Self::Other => write!(f, "error"), + Self::PullBasedConsumer => write!(f, "cannot use with pull consumer"), } } } -crate::error_impls!(MessagesError, MessagesErrorKind); +pub type MessagesError = Error; -#[derive(Debug, Clone, PartialEq)] -pub enum MessagesErrorKind { - MissingHeartbeat, - ConsumerDeleted, - PullBasedConsumer, - Other, -} - -#[derive(Debug)] -pub struct ConsumerRecreateError { - kind: ConsumerRecreateErrorKind, - source: Option, +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum ConsumerRecreateErrorKind { + GetStream, + Subscription, + Recreate, + TimedOut, } -crate::error_impls!(ConsumerRecreateError, ConsumerRecreateErrorKind); - -impl std::fmt::Display for ConsumerRecreateError { +impl std::fmt::Display for ConsumerRecreateErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match &self.kind() { - ConsumerRecreateErrorKind::GetStream => { - write!(f, "error getting stream: {}", self.format_source()) - } - ConsumerRecreateErrorKind::Recreate => { - write!(f, "consumer creation failed: {}", self.format_source()) - } - ConsumerRecreateErrorKind::TimedOut => write!(f, "timed out"), - ConsumerRecreateErrorKind::Subscription => write!(f, "failed to resubscribe"), + match self { + Self::GetStream => write!(f, "error getting stream"), + Self::Recreate => write!(f, "consumer creation failed"), + Self::TimedOut => write!(f, "timed out"), + Self::Subscription => write!(f, "failed to resubscribe"), } } } -#[derive(Debug, Clone, PartialEq, Copy)] -pub enum ConsumerRecreateErrorKind { - GetStream, - Subscription, - Recreate, - TimedOut, -} +pub type ConsumerRecreateError = Error; async fn recreate_consumer_and_subscription( context: Context, diff --git a/async-nats/src/jetstream/context.rs b/async-nats/src/jetstream/context.rs index 239d73a2e..c090d29dd 100644 --- a/async-nats/src/jetstream/context.rs +++ b/async-nats/src/jetstream/context.rs @@ -13,6 +13,7 @@ // //! Manage operations on [Context], create/delete/update [Stream][crate::jetstream::stream::Stream] +use crate::error::Error; use crate::header::{IntoHeaderName, IntoHeaderValue}; use crate::jetstream::account::Account; use crate::jetstream::publish::PublishAck; @@ -723,7 +724,7 @@ impl Context { .await } - // pub async fn update_key_value>(&self, config: C) -> Result<(), Error> { + // pub async fn update_key_value>(&self, config: C) -> Result<(), crate::Error> { // let config = config.borrow(); // if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) { // return Err(Box::new(std::io::Error::new( @@ -953,29 +954,7 @@ impl Context { } } -#[derive(Debug)] -pub struct PublishError { - kind: PublishErrorKind, - source: Option, -} - -crate::error_impls!(PublishError, PublishErrorKind); - -impl Display for PublishError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let source = self.format_source(); - match self.kind { - PublishErrorKind::StreamNotFound => write!(f, "no stream found for given subject"), - PublishErrorKind::TimedOut => write!(f, "timed out: didn't receive ack in time"), - PublishErrorKind::Other => write!(f, "publish failed: {}", source), - PublishErrorKind::BrokenPipe => write!(f, "broken pipe"), - PublishErrorKind::WrongLastMessageId => write!(f, "wrong last message id"), - PublishErrorKind::WrongLastSequence => write!(f, "wrong last sequence"), - } - } -} - -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum PublishErrorKind { StreamNotFound, WrongLastMessageId, @@ -985,6 +964,21 @@ pub enum PublishErrorKind { Other, } +impl Display for PublishErrorKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::StreamNotFound => write!(f, "no stream found for given subject"), + Self::TimedOut => write!(f, "timed out: didn't receive ack in time"), + Self::Other => write!(f, "publish failed"), + Self::BrokenPipe => write!(f, "broken pipe"), + Self::WrongLastMessageId => write!(f, "wrong last message id"), + Self::WrongLastSequence => write!(f, "wrong last sequence"), + } + } +} + +pub type PublishError = Error; + #[derive(Debug)] pub struct PublishAckFuture { timeout: Duration, @@ -1069,7 +1063,7 @@ impl futures::Stream for StreamNames<'_> { std::task::Poll::Ready(page) => { self.page_request = None; let page = page - .map_err(|err| StreamsError::with_source(RequestErrorKind::Other, err))?; + .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?; if let Some(streams) = page.streams { self.offset += streams.len(); self.streams = streams; @@ -1120,18 +1114,8 @@ impl futures::Stream for StreamNames<'_> { type PageInfoRequest<'a> = BoxFuture<'a, Result>; -#[derive(Debug)] -pub struct StreamsError { - kind: RequestErrorKind, - source: Option, -} -crate::error_impls!(StreamsError, RequestErrorKind); - -impl Display for StreamsError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self::Display::fmt(&self, f) - } -} +pub type StreamsErrorKind = RequestErrorKind; +pub type StreamsError = RequestError; pub struct Streams<'a> { context: Context, @@ -1153,7 +1137,7 @@ impl futures::Stream for Streams<'_> { std::task::Poll::Ready(page) => { self.page_request = None; let page = page - .map_err(|err| StreamsError::with_source(RequestErrorKind::Other, err))?; + .map_err(|err| StreamsError::with_source(StreamsErrorKind::Other, err))?; if let Some(streams) = page.streams { self.offset += streams.len(); self.streams = streams; @@ -1268,27 +1252,25 @@ impl Publish { } } -#[derive(Debug)] -pub struct RequestError { - kind: RequestErrorKind, - source: Option, +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum RequestErrorKind { + NoResponders, + TimedOut, + Other, } -crate::error_impls!(RequestError, RequestErrorKind); - -impl Display for RequestError { +impl Display for RequestErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let source = self.format_source(); - match &self.kind { - RequestErrorKind::TimedOut => write!(f, "timed out"), - RequestErrorKind::Other => write!(f, "request failed: {}", source), - RequestErrorKind::NoResponders => { - write!(f, "requested JetStream resource does not exist: {}", source) - } + match self { + Self::TimedOut => write!(f, "timed out"), + Self::Other => write!(f, "request failed"), + Self::NoResponders => write!(f, "requested JetStream resource does not exist"), } } } +pub type RequestError = Error; + impl From for RequestError { fn from(error: crate::RequestError) -> Self { match error.kind() { @@ -1311,44 +1293,35 @@ impl From for RequestError { } } -#[derive(Debug, Clone, PartialEq)] -pub enum RequestErrorKind { - NoResponders, +#[derive(Clone, Debug, PartialEq)] +pub enum CreateStreamErrorKind { + EmptyStreamName, + InvalidStreamName, + DomainAndExternalSet, + JetStreamUnavailable, + JetStream(super::errors::Error), TimedOut, - Other, -} - -#[derive(Debug)] -pub struct CreateStreamError { - kind: CreateStreamErrorKind, - source: Option, + Response, + ResponseParse, } -crate::error_impls!(CreateStreamError, CreateStreamErrorKind); - -impl Display for CreateStreamError { +impl Display for CreateStreamErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match &self.kind { - CreateStreamErrorKind::EmptyStreamName => write!(f, "stream name cannot be empty"), - CreateStreamErrorKind::InvalidStreamName => { - write!(f, "stream name cannot contain `.`, `_`") - } - CreateStreamErrorKind::DomainAndExternalSet => { - write!(f, "domain and external are both set") - } - CreateStreamErrorKind::JetStream(err) => { - write!(f, "jetstream error: {}", err) - } - CreateStreamErrorKind::TimedOut => write!(f, "jetstream request timed out"), - CreateStreamErrorKind::JetStreamUnavailable => write!(f, "jetstream unavailable"), - CreateStreamErrorKind::ResponseParse => write!(f, "failed to parse server response"), - CreateStreamErrorKind::Response => { - write!(f, "response error: {}", self.format_source()) - } + match self { + Self::EmptyStreamName => write!(f, "stream name cannot be empty"), + Self::InvalidStreamName => write!(f, "stream name cannot contain `.`, `_`"), + Self::DomainAndExternalSet => write!(f, "domain and external are both set"), + Self::JetStream(err) => write!(f, "jetstream error: {}", err), + Self::TimedOut => write!(f, "jetstream request timed out"), + Self::JetStreamUnavailable => write!(f, "jetstream unavailable"), + Self::ResponseParse => write!(f, "failed to parse server response"), + Self::Response => write!(f, "response error"), } } } +pub type CreateStreamError = Error; + impl From for CreateStreamError { fn from(error: super::errors::Error) -> Self { CreateStreamError::new(CreateStreamErrorKind::JetStream(error)) @@ -1369,84 +1342,50 @@ impl From for CreateStreamError { } } -#[derive(Debug, Clone, PartialEq)] -pub enum CreateStreamErrorKind { - EmptyStreamName, - InvalidStreamName, - DomainAndExternalSet, - JetStreamUnavailable, +#[derive(Clone, Debug, PartialEq)] +pub enum GetStreamErrorKind { + EmptyName, + Request, JetStream(super::errors::Error), - TimedOut, - Response, - ResponseParse, -} - -#[derive(Debug)] -pub struct GetStreamError { - kind: GetStreamErrorKind, - source: Option, } -crate::error_impls!(GetStreamError, GetStreamErrorKind); - -impl Display for GetStreamError { +impl Display for GetStreamErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.kind() { - GetStreamErrorKind::EmptyName => write!(f, "empty name cannot be empty"), - GetStreamErrorKind::Request => { - write!(f, "request error: {}", self.format_source()) - } - GetStreamErrorKind::JetStream(err) => write!(f, "jetstream error: {}", err), + match self { + Self::EmptyName => write!(f, "empty name cannot be empty"), + Self::Request => write!(f, "request error"), + Self::JetStream(err) => write!(f, "jetstream error: {}", err), } } } -#[derive(Debug, Clone, PartialEq)] -pub enum GetStreamErrorKind { - EmptyName, - Request, - JetStream(super::errors::Error), -} +pub type GetStreamError = Error; pub type UpdateStreamError = CreateStreamError; pub type UpdateStreamErrorKind = CreateStreamErrorKind; pub type DeleteStreamError = GetStreamError; pub type DeleteStreamErrorKind = GetStreamErrorKind; -#[derive(Debug)] -pub struct KeyValueError { - kind: KeyValueErrorKind, - source: Option, -} - -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum KeyValueErrorKind { InvalidStoreName, GetBucket, JetStream, } -crate::error_impls!(KeyValueError, KeyValueErrorKind); - -impl Display for KeyValueError { +impl Display for KeyValueErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.kind() { - KeyValueErrorKind::InvalidStoreName => write!(f, "invalid Key Value Store name"), - KeyValueErrorKind::GetBucket => write!(f, "failed to get the bucket"), - KeyValueErrorKind::JetStream => { - write!(f, "JetStream error: {}", self.format_source()) - } + match self { + Self::InvalidStoreName => write!(f, "invalid Key Value Store name"), + Self::GetBucket => write!(f, "failed to get the bucket"), + Self::JetStream => write!(f, "JetStream error"), } } } -#[derive(Debug)] -pub struct CreateKeyValueError { - kind: CreateKeyValueErrorKind, - source: Option, -} +pub type KeyValueError = Error; -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum CreateKeyValueErrorKind { InvalidStoreName, TooLongHistory, @@ -1455,62 +1394,44 @@ pub enum CreateKeyValueErrorKind { TimedOut, } -crate::error_impls!(CreateKeyValueError, CreateKeyValueErrorKind); - -impl Display for CreateKeyValueError { +impl Display for CreateKeyValueErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let source = self.format_source(); - match self.kind() { - CreateKeyValueErrorKind::InvalidStoreName => write!(f, "invalid Key Value Store name"), - CreateKeyValueErrorKind::TooLongHistory => write!(f, "too long history"), - CreateKeyValueErrorKind::JetStream => { - write!(f, "JetStream error: {}", source) - } - CreateKeyValueErrorKind::BucketCreate => { - write!(f, "bucket creation failed: {}", source) - } - CreateKeyValueErrorKind::TimedOut => write!(f, "timed out"), + match self { + Self::InvalidStoreName => write!(f, "invalid Key Value Store name"), + Self::TooLongHistory => write!(f, "too long history"), + Self::JetStream => write!(f, "JetStream error"), + Self::BucketCreate => write!(f, "bucket creation failed"), + Self::TimedOut => write!(f, "timed out"), } } } +pub type CreateKeyValueError = Error; + pub type CreateObjectStoreError = CreateKeyValueError; pub type CreateObjectStoreErrorKind = CreateKeyValueErrorKind; -#[derive(Debug)] -pub struct ObjectStoreError { - kind: ObjectStoreErrorKind, - source: Option, -} -crate::error_impls!(ObjectStoreError, ObjectStoreErrorKind); - -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum ObjectStoreErrorKind { InvalidBucketName, GetStore, } -impl Display for ObjectStoreError { +impl Display for ObjectStoreErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.kind() { - ObjectStoreErrorKind::InvalidBucketName => { - write!(f, "invalid Object Store bucket name") - } - ObjectStoreErrorKind::GetStore => write!(f, "failed to get Object Store"), + match self { + Self::InvalidBucketName => write!(f, "invalid Object Store bucket name"), + Self::GetStore => write!(f, "failed to get Object Store"), } } } +pub type ObjectStoreError = Error; + pub type DeleteObjectStore = ObjectStoreError; pub type DeleteObjectStoreKind = ObjectStoreErrorKind; -#[derive(Debug)] -pub struct AccountError { - kind: AccountErrorKind, - source: Option>, -} - -#[derive(Debug, PartialEq, Clone)] +#[derive(Clone, Debug, PartialEq)] pub enum AccountErrorKind { TimedOut, JetStream(super::errors::Error), @@ -1518,19 +1439,19 @@ pub enum AccountErrorKind { Other, } -crate::error_impls!(AccountError, AccountErrorKind); - -impl Display for AccountError { +impl Display for AccountErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match &self.kind { - AccountErrorKind::TimedOut => write!(f, "timed out"), - AccountErrorKind::JetStream(err) => write!(f, "JetStream error: {}", err), - AccountErrorKind::Other => write!(f, "error: {}", self.format_source()), - AccountErrorKind::JetStreamUnavailable => write!(f, "JetStream unavailable"), + match self { + Self::TimedOut => write!(f, "timed out"), + Self::JetStream(err) => write!(f, "JetStream error: {}", err), + Self::Other => write!(f, "error"), + Self::JetStreamUnavailable => write!(f, "JetStream unavailable"), } } } +pub type AccountError = Error; + impl From for AccountError { fn from(err: RequestError) -> Self { match err.kind { diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index 553e1c051..57fe3d7e4 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -15,7 +15,11 @@ pub mod bucket; -use std::{fmt::Display, task::Poll}; +use std::{ + fmt::{self, Display}, + str::FromStr, + task::Poll, +}; use crate::{HeaderValue, StatusCode}; use bytes::Bytes; @@ -25,6 +29,7 @@ use regex::Regex; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; use tracing::debug; +use crate::error::Error; use crate::{header, Message}; use self::bucket::Status; @@ -38,22 +43,27 @@ use super::{ }, }; -// Helper to extract key value operation from message headers -fn kv_operation_from_maybe_headers(maybe_headers: Option<&str>) -> Operation { - if let Some(headers) = maybe_headers { - return match headers { - KV_OPERATION_DELETE => Operation::Delete, - KV_OPERATION_PURGE => Operation::Purge, - _ => Operation::Put, - }; +fn kv_operation_from_stream_message(message: &RawMessage) -> Operation { + match message.headers.as_deref() { + Some(headers) => headers.parse().unwrap_or(Operation::Put), + None => Operation::Put, } - - Operation::Put } -fn kv_operation_from_stream_message(message: &RawMessage) -> Operation { - kv_operation_from_maybe_headers(message.headers.as_deref()) +fn kv_operation_from_message(message: &Message) -> Result { + let headers = message + .headers + .as_ref() + .ok_or_else(|| EntryError::with_source(EntryErrorKind::Other, "missing headers"))?; + + headers + .get(KV_OPERATION) + .map(|x| x.iter().next().unwrap().as_str()) + .unwrap_or(KV_OPERATION_PUT) + .parse() + .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err)) } + static VALID_BUCKET_RE: Lazy = Lazy::new(|| Regex::new(r#"\A[a-zA-Z0-9_-]+\z"#).unwrap()); static VALID_KEY_RE: Lazy = Lazy::new(|| Regex::new(r#"\A[-/_=\.a-zA-Z0-9]+\z"#).unwrap()); @@ -120,6 +130,30 @@ pub enum Operation { Purge, } +impl FromStr for Operation { + type Err = ParseOperationError; + + fn from_str(s: &str) -> Result { + match s { + KV_OPERATION_DELETE => Ok(Operation::Delete), + KV_OPERATION_PURGE => Ok(Operation::Purge), + KV_OPERATION_PUT => Ok(Operation::Put), + _ => Err(ParseOperationError), + } + } +} + +#[derive(Debug, Clone)] +pub struct ParseOperationError; + +impl fmt::Display for ParseOperationError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "invalid value found for operation (value can only be {KV_OPERATION_PUT}, {KV_OPERATION_PURGE} or {KV_OPERATION_DELETE}") + } +} + +impl std::error::Error for ParseOperationError {} + /// A struct used as a handle for the bucket. #[derive(Debug, Clone)] pub struct Store { @@ -258,20 +292,10 @@ impl Store { let headers = message.headers.as_ref().ok_or_else(|| { EntryError::with_source(EntryErrorKind::Other, "missing headers") })?; - let operation = headers.get(KV_OPERATION).map_or_else( - || Operation::Put, - |operation| match operation - .iter() - .next() - .cloned() - .unwrap_or_else(|| KV_OPERATION_PUT.to_string()) - .as_ref() - { - KV_OPERATION_PURGE => Operation::Purge, - KV_OPERATION_DELETE => Operation::Delete, - _ => Operation::Put, - }, - ); + + let operation = + kv_operation_from_message(&message).unwrap_or(Operation::Put); + let sequence = headers .get(header::NATS_SEQUENCE) .ok_or_else(|| { @@ -689,7 +713,13 @@ impl Store { return Err(PurgeError::new(PurgeErrorKind::InvalidKey)); } - let subject = format!("{}{}", self.prefix.as_str(), key.as_ref()); + let mut subject = String::new(); + if self.use_jetstream_prefix { + subject.push_str(&self.stream.context.prefix); + subject.push('.'); + } + subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix)); + subject.push_str(key.as_ref()); let mut headers = crate::HeaderMap::default(); headers.insert(KV_OPERATION, HeaderValue::from(KV_OPERATION_PURGE)); @@ -854,20 +884,7 @@ impl<'a> futures::Stream for Watch<'a> { ) })?; - let operation = match message - .headers - .as_ref() - .and_then(|headers| headers.get(KV_OPERATION)) - .unwrap_or(&HeaderValue::from(KV_OPERATION_PUT)) - .iter() - .next() - .unwrap() - .as_str() - { - KV_OPERATION_DELETE => Operation::Delete, - KV_OPERATION_PURGE => Operation::Purge, - _ => Operation::Put, - }; + let operation = kv_operation_from_message(&message).unwrap_or(Operation::Put); let key = message .subject @@ -928,20 +945,7 @@ impl<'a> futures::Stream for History<'a> { self.done = true; } - let operation = match message - .headers - .as_ref() - .and_then(|headers| headers.get(KV_OPERATION)) - .unwrap_or(&HeaderValue::from(KV_OPERATION_PUT)) - .iter() - .next() - .unwrap() - .as_str() - { - KV_OPERATION_DELETE => Operation::Delete, - KV_OPERATION_PURGE => Operation::Purge, - _ => Operation::Put, - }; + let operation = kv_operation_from_message(&message).unwrap_or(Operation::Put); let key = message .subject @@ -1020,72 +1024,61 @@ pub struct Entry { pub operation: Operation, } -#[derive(Debug)] -pub struct StatusError { - kind: StatusErrorKind, - source: Option>, -} - -#[derive(Debug, PartialEq, Clone)] +#[derive(Clone, Debug, PartialEq)] pub enum StatusErrorKind { JetStream(crate::jetstream::Error), TimedOut, } -crate::error_impls!(StatusError, StatusErrorKind); - -impl Display for StatusError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.kind.clone() { - StatusErrorKind::JetStream(err) => { - write!(f, "jetstream request failed: {}", err) - } - StatusErrorKind::TimedOut => write!(f, "timed out"), +impl Display for StatusErrorKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::JetStream(err) => write!(f, "jetstream request failed: {}", err), + Self::TimedOut => write!(f, "timed out"), } } } -#[derive(Debug)] -pub struct PutError { - kind: PutErrorKind, - source: Option>, -} +pub type StatusError = Error; -#[derive(Debug, PartialEq, Clone)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum PutErrorKind { InvalidKey, Publish, Ack, } -crate::error_impls!(PutError, PutErrorKind); - -impl Display for PutError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.kind { - PutErrorKind::Publish => { - write!(f, "failed to put key into store: {}", self.format_source()) - } - PutErrorKind::Ack => write!(f, "ack error: {}", self.format_source()), - PutErrorKind::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"), +impl Display for PutErrorKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Publish => write!(f, "failed to put key into store"), + Self::Ack => write!(f, "ack error"), + Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"), } } } -#[derive(Debug)] -pub struct EntryError { - kind: EntryErrorKind, - source: Option>, -} +pub type PutError = Error; -#[derive(Debug, PartialEq, Clone)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum EntryErrorKind { InvalidKey, TimedOut, Other, } -crate::error_impls!(EntryError, EntryErrorKind); +impl Display for EntryErrorKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"), + Self::TimedOut => write!(f, "timed out"), + Self::Other => write!(f, "failed getting entry"), + } + } +} + +pub type EntryError = Error; + crate::from_with_timeout!( EntryError, EntryErrorKind, @@ -1093,23 +1086,7 @@ crate::from_with_timeout!( DirectGetErrorKind ); -impl Display for EntryError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.kind { - EntryErrorKind::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"), - EntryErrorKind::TimedOut => write!(f, "timed out"), - EntryErrorKind::Other => write!(f, "failed getting entry: {}", self.format_source()), - } - } -} - -#[derive(Debug)] -pub struct WatchError { - kind: WatchErrorKind, - source: Option>, -} - -#[derive(Debug, PartialEq, Clone)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum WatchErrorKind { InvalidKey, TimedOut, @@ -1117,77 +1094,59 @@ pub enum WatchErrorKind { Other, } -crate::error_impls!(WatchError, WatchErrorKind); -crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind); -crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind); - -impl Display for WatchError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.kind { - WatchErrorKind::ConsumerCreate => { - write!( - f, - "watch consumer creation failed: {}", - self.format_source() - ) - } - WatchErrorKind::Other => write!(f, "watch failed: {}", self.format_source()), - WatchErrorKind::TimedOut => write!(f, "timed out"), - WatchErrorKind::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"), +impl Display for WatchErrorKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::ConsumerCreate => write!(f, "watch consumer creation failed"), + Self::Other => write!(f, "watch failed"), + Self::TimedOut => write!(f, "timed out"), + Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"), } } } -#[derive(Debug)] -pub struct UpdateError { - kind: UpdateErrorKind, - source: Option>, -} +pub type WatchError = Error; -#[derive(Debug, PartialEq, Clone)] +crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind); +crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind); + +#[derive(Clone, Copy, Debug, PartialEq)] pub enum UpdateErrorKind { InvalidKey, TimedOut, Other, } -crate::error_impls!(UpdateError, UpdateErrorKind); -crate::from_with_timeout!(UpdateError, UpdateErrorKind, PublishError, PublishErrorKind); - -impl Display for UpdateError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.kind { - UpdateErrorKind::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"), - UpdateErrorKind::TimedOut => write!(f, "timed out"), - UpdateErrorKind::Other => write!(f, "failed getting entry: {}", self.format_source()), +impl Display for UpdateErrorKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"), + Self::TimedOut => write!(f, "timed out"), + Self::Other => write!(f, "failed getting entry"), } } } -#[derive(Debug)] -pub struct WatcherError { - kind: WatcherErrorKind, - source: Option>, -} +pub type UpdateError = Error; -#[derive(Clone, Debug, PartialEq)] +crate::from_with_timeout!(UpdateError, UpdateErrorKind, PublishError, PublishErrorKind); + +#[derive(Clone, Copy, Debug, PartialEq)] pub enum WatcherErrorKind { Consumer, Other, } -impl Display for WatcherError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.kind { - WatcherErrorKind::Consumer => { - write!(f, "watcher consumer error: {}", self.format_source()) - } - WatcherErrorKind::Other => write!(f, "watcher error: {}", self.format_source()), +impl Display for WatcherErrorKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Consumer => write!(f, "watcher consumer error"), + Self::Other => write!(f, "watcher error"), } } } -crate::error_impls!(WatcherError, WatcherErrorKind); +pub type WatcherError = Error; impl From for WatcherError { fn from(err: OrderedError) -> Self { diff --git a/async-nats/src/jetstream/object_store/mod.rs b/async-nats/src/jetstream/object_store/mod.rs index a5ddd4fe4..b5f4be998 100644 --- a/async-nats/src/jetstream/object_store/mod.rs +++ b/async-nats/src/jetstream/object_store/mod.rs @@ -12,6 +12,7 @@ // limitations under the License. //! Object Store module +use std::collections::VecDeque; use std::fmt::Display; use std::{cmp, str::FromStr, task::Poll, time::Duration}; @@ -33,6 +34,7 @@ use super::consumer::{StreamError, StreamErrorKind}; use super::context::{PublishError, PublishErrorKind}; use super::stream::{ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind}; use super::{consumer::push::Ordered, stream::StorageType}; +use crate::error::Error; use time::{serde::rfc3339, OffsetDateTime}; const DEFAULT_CHUNK_SIZE: usize = 128 * 1024; @@ -573,7 +575,7 @@ impl Stream for List<'_> { /// Represents an object stored in a bucket. pub struct Object<'a> { pub info: ObjectInfo, - remaining_bytes: Vec, + remaining_bytes: VecDeque, has_pending_messages: bool, digest: Option, subscription: Option>, @@ -584,7 +586,7 @@ impl<'a> Object<'a> { Object { subscription: Some(subscription), info, - remaining_bytes: Vec::new(), + remaining_bytes: VecDeque::new(), has_pending_messages: true, digest: Some(ring::digest::Context::new(&SHA256)), } @@ -602,10 +604,11 @@ impl tokio::io::AsyncRead for Object<'_> { cx: &mut std::task::Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> std::task::Poll> { - if !self.remaining_bytes.is_empty() { - let len = cmp::min(buf.remaining(), self.remaining_bytes.len()); - buf.put_slice(&self.remaining_bytes[..len]); - self.remaining_bytes = self.remaining_bytes[len..].to_vec(); + let (buf1, _buf2) = self.remaining_bytes.as_slices(); + if !buf1.is_empty() { + let len = cmp::min(buf.remaining(), buf1.len()); + buf.put_slice(&buf1[..len]); + self.remaining_bytes.drain(..len); return Poll::Ready(Ok(())); } @@ -625,8 +628,7 @@ impl tokio::io::AsyncRead for Object<'_> { if let Some(context) = &mut self.digest { context.update(&message.payload); } - self.remaining_bytes - .extend_from_slice(&message.payload[len..]); + self.remaining_bytes.extend(&message.payload[len..]); let info = message.info().map_err(|err| { std::io::Error::new( @@ -740,13 +742,7 @@ impl From<&str> for ObjectMeta { } } -#[derive(Debug)] -pub struct InfoError { - kind: InfoErrorKind, - source: Option>, -} - -#[derive(Debug, PartialEq, Clone)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum InfoErrorKind { InvalidName, NotFound, @@ -754,25 +750,20 @@ pub enum InfoErrorKind { TimedOut, } -impl Display for InfoError { +impl Display for InfoErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.kind { - InfoErrorKind::InvalidName => write!(f, "invalid object name"), - InfoErrorKind::Other => write!(f, "getting info failed: {}", self.format_source()), - InfoErrorKind::NotFound => write!(f, "not found"), - InfoErrorKind::TimedOut => write!(f, "timed out"), + match self { + Self::InvalidName => write!(f, "invalid object name"), + Self::Other => write!(f, "getting info failed"), + Self::NotFound => write!(f, "not found"), + Self::TimedOut => write!(f, "timed out"), } } } -crate::error_impls!(InfoError, InfoErrorKind); +pub type InfoError = Error; -#[derive(Debug)] -pub struct GetError { - kind: GetErrorKind, - source: Option>, -} -#[derive(Debug, PartialEq, Clone)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum GetErrorKind { InvalidName, ConsumerCreate, @@ -780,7 +771,21 @@ pub enum GetErrorKind { Other, TimedOut, } -crate::error_impls!(GetError, GetErrorKind); + +impl Display for GetErrorKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::ConsumerCreate => write!(f, "failed creating consumer for fetching object"), + Self::Other => write!(f, "failed getting object"), + Self::NotFound => write!(f, "object not found"), + Self::TimedOut => write!(f, "timed out"), + Self::InvalidName => write!(f, "invalid object name"), + } + } +} + +pub type GetError = Error; + crate::from_with_timeout!(GetError, GetErrorKind, ConsumerError, ConsumerErrorKind); crate::from_with_timeout!(GetError, GetErrorKind, StreamError, StreamErrorKind); @@ -795,31 +800,7 @@ impl From for GetError { } } -impl Display for GetError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.kind() { - GetErrorKind::ConsumerCreate => { - write!( - f, - "failed creating consumer for fetching object: {}", - self.format_source() - ) - } - GetErrorKind::Other => write!(f, "failed getting object: {}", self.format_source()), - GetErrorKind::NotFound => write!(f, "object not found"), - GetErrorKind::TimedOut => write!(f, "timed out"), - GetErrorKind::InvalidName => write!(f, "invalid object name"), - } - } -} - -#[derive(Debug)] -pub struct DeleteError { - kind: DeleteErrorKind, - source: Option>, -} - -#[derive(Debug, Clone, PartialEq)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum DeleteErrorKind { TimedOut, NotFound, @@ -829,21 +810,21 @@ pub enum DeleteErrorKind { Other, } -impl Display for DeleteError { +impl Display for DeleteErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.kind() { - DeleteErrorKind::TimedOut => write!(f, "timed out"), - DeleteErrorKind::Metadata => { - write!(f, "failed rolling up metadata: {}", self.format_source()) - } - DeleteErrorKind::Chunks => write!(f, "failed purging chunks: {}", self.format_source()), - DeleteErrorKind::Other => write!(f, "delete failed: {}", self.format_source()), - DeleteErrorKind::NotFound => write!(f, "object not found"), - DeleteErrorKind::InvalidName => write!(f, "invalid object name"), + match self { + Self::TimedOut => write!(f, "timed out"), + Self::Metadata => write!(f, "failed rolling up metadata"), + Self::Chunks => write!(f, "failed purging chunks"), + Self::Other => write!(f, "delete failed"), + Self::NotFound => write!(f, "object not found"), + Self::InvalidName => write!(f, "invalid object name"), } } } +pub type DeleteError = Error; + impl From for DeleteError { fn from(err: InfoError) -> Self { match err.kind() { @@ -855,17 +836,10 @@ impl From for DeleteError { } } -crate::error_impls!(DeleteError, DeleteErrorKind); crate::from_with_timeout!(DeleteError, DeleteErrorKind, PublishError, PublishErrorKind); crate::from_with_timeout!(DeleteError, DeleteErrorKind, PurgeError, PurgeErrorKind); -#[derive(Debug)] -pub struct PutError { - kind: PutErrorKind, - source: Option>, -} - -#[derive(Debug, Clone, PartialEq)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum PutErrorKind { InvalidName, ReadChunks, @@ -876,79 +850,48 @@ pub enum PutErrorKind { Other, } -crate::error_impls!(PutError, PutErrorKind); - -impl Display for PutError { +impl Display for PutErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.kind() { - PutErrorKind::PublishChunks => { - write!( - f, - "failed publishing object chunks: {}", - self.format_source() - ) - } - PutErrorKind::PublishMetadata => { - write!(f, "failed publishing metadata: {}", self.format_source()) - } - PutErrorKind::PurgeOldChunks => { - write!(f, "falied purging old chunks: {}", self.format_source()) - } - PutErrorKind::TimedOut => write!(f, "timed out"), - PutErrorKind::Other => write!(f, "error: {}", self.format_source()), - PutErrorKind::InvalidName => write!(f, "invalid object name"), - PutErrorKind::ReadChunks => write!( - f, - "error while reading the buffer: {}", - self.format_source() - ), + match self { + Self::PublishChunks => write!(f, "failed publishing object chunks"), + Self::PublishMetadata => write!(f, "failed publishing metadata"), + Self::PurgeOldChunks => write!(f, "failed purging old chunks"), + Self::TimedOut => write!(f, "timed out"), + Self::Other => write!(f, "error"), + Self::InvalidName => write!(f, "invalid object name"), + Self::ReadChunks => write!(f, "error while reading the buffer"), } } } -#[derive(Debug)] -pub struct WatchError { - kind: WatchErrorKind, - source: Option>, -} +pub type PutError = Error; -#[derive(Debug, PartialEq, Clone)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum WatchErrorKind { TimedOut, ConsumerCreate, Other, } -crate::error_impls!(WatchError, WatchErrorKind); -crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind); -crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind); - -impl Display for WatchError { +impl Display for WatchErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.kind { - WatchErrorKind::ConsumerCreate => { - write!( - f, - "watch consumer creation failed: {}", - self.format_source() - ) - } - WatchErrorKind::Other => write!(f, "watch failed: {}", self.format_source()), - WatchErrorKind::TimedOut => write!(f, "timed out"), + match self { + Self::ConsumerCreate => write!(f, "watch consumer creation failed"), + Self::Other => write!(f, "watch failed"), + Self::TimedOut => write!(f, "timed out"), } } } +pub type WatchError = Error; + +crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind); +crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind); + pub type ListError = WatchError; pub type ListErrorKind = WatchErrorKind; -#[derive(Debug)] -pub struct SealError { - kind: SealErrorKind, - source: Option>, -} - -#[derive(Debug, Clone, PartialEq)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum SealErrorKind { TimedOut, Other, @@ -956,25 +899,19 @@ pub enum SealErrorKind { Update, } -crate::error_impls!(SealError, SealErrorKind); - -impl Display for SealError { +impl Display for SealErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.kind { - SealErrorKind::TimedOut => write!(f, "timed out"), - SealErrorKind::Other => write!(f, "seal failed: {}", self.format_source()), - SealErrorKind::Info => write!( - f, - "failed getting stream info before sealing bucket: {}", - self.format_source() - ), - SealErrorKind::Update => { - write!(f, "failed sealing the bucket: {}", self.format_source()) - } + match self { + Self::TimedOut => write!(f, "timed out"), + Self::Other => write!(f, "seal failed"), + Self::Info => write!(f, "failed getting stream info before sealing bucket"), + Self::Update => write!(f, "failed sealing the bucket"), } } } +pub type SealError = Error; + impl From for SealError { fn from(err: super::context::UpdateStreamError) -> Self { match err.kind() { @@ -986,30 +923,22 @@ impl From for SealError { } } -#[derive(Debug)] -pub struct WatcherError { - kind: WatcherErrorKind, - source: Option>, -} - -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum WatcherErrorKind { ConsumerError, Other, } -impl Display for WatcherError { +impl Display for WatcherErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.kind { - WatcherErrorKind::ConsumerError => { - write!(f, "watcher consumer error: {}", self.format_source()) - } - WatcherErrorKind::Other => write!(f, "watcher error: {}", self.format_source()), + match self { + Self::ConsumerError => write!(f, "watcher consumer error"), + Self::Other => write!(f, "watcher error"), } } } -crate::error_impls!(WatcherError, WatcherErrorKind); +pub type WatcherError = Error; impl From for WatcherError { fn from(err: OrderedError) -> Self { diff --git a/async-nats/src/jetstream/stream.rs b/async-nats/src/jetstream/stream.rs index a32eb0511..0283753b0 100644 --- a/async-nats/src/jetstream/stream.rs +++ b/async-nats/src/jetstream/stream.rs @@ -25,8 +25,9 @@ use std::{ time::Duration, }; -use crate::{header::HeaderName, is_valid_subject, HeaderMap, HeaderValue}; -use crate::{Error, StatusCode}; +use crate::{ + error::Error, header::HeaderName, is_valid_subject, HeaderMap, HeaderValue, StatusCode, +}; use base64::engine::general_purpose::STANDARD; use base64::engine::Engine; use bytes::Bytes; @@ -37,7 +38,7 @@ use time::{serde::rfc3339, OffsetDateTime}; use super::{ consumer::{self, Consumer, FromConsumer, IntoConsumerConfig}, - context::{RequestError, RequestErrorKind, StreamsError}, + context::{RequestError, RequestErrorKind, StreamsError, StreamsErrorKind}, errors::ErrorCode, response::Response, Context, Message, @@ -45,14 +46,7 @@ use super::{ pub type InfoError = RequestError; -#[derive(Debug)] -pub struct DirectGetError { - kind: DirectGetErrorKind, - source: Option, -} -crate::error_impls!(DirectGetError, DirectGetErrorKind); - -#[derive(Debug, Clone, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub enum DirectGetErrorKind { NotFound, InvalidSubject, @@ -62,24 +56,23 @@ pub enum DirectGetErrorKind { Other, } -impl Display for DirectGetError { +impl Display for DirectGetErrorKind { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let source = self.format_source(); - match self.kind() { - DirectGetErrorKind::InvalidSubject => write!(f, "invalid subject"), - DirectGetErrorKind::NotFound => write!(f, "message not found"), - DirectGetErrorKind::ErrorResponse(status, description) => { + match self { + Self::InvalidSubject => write!(f, "invalid subject"), + Self::NotFound => write!(f, "message not found"), + Self::ErrorResponse(status, description) => { write!(f, "unable to get message: {} {}", status, description) } - DirectGetErrorKind::Other => { - write!(f, "error getting message: {}", source) - } - DirectGetErrorKind::TimedOut => write!(f, "timed out"), - DirectGetErrorKind::Request => write!(f, "request failed: {}", source), + Self::Other => write!(f, "error getting message"), + Self::TimedOut => write!(f, "timed out"), + Self::Request => write!(f, "request failed"), } } } +pub type DirectGetError = Error; + impl From for DirectGetError { fn from(err: crate::RequestError) -> Self { match err.kind() { @@ -98,31 +91,25 @@ impl From for DirectGetError { } } -#[derive(Debug)] -pub struct DeleteMessageError { - kind: DeleteMessageErrorKind, - source: Option, -} - -#[derive(Debug, Clone, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub enum DeleteMessageErrorKind { Request, TimedOut, JetStream(super::errors::Error), } -crate::error_impls!(DeleteMessageError, DeleteMessageErrorKind); -impl Display for DeleteMessageError { +impl Display for DeleteMessageErrorKind { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let source = self.format_source(); - match &self.kind { - DeleteMessageErrorKind::Request => write!(f, "request failed: {}", source), - DeleteMessageErrorKind::TimedOut => write!(f, "timed out"), - DeleteMessageErrorKind::JetStream(err) => write!(f, "JetStream error: {}", err), + match self { + Self::Request => write!(f, "request failed"), + Self::TimedOut => write!(f, "timed out"), + Self::JetStream(err) => write!(f, "JetStream error: {}", err), } } } +pub type DeleteMessageError = Error; + /// Handle to operations that can be performed on a `Stream`. #[derive(Debug, Clone)] pub struct Stream { @@ -535,7 +522,7 @@ impl Stream { /// # Ok(()) /// # } /// ``` - pub async fn get_raw_message(&self, sequence: u64) -> Result { + pub async fn get_raw_message(&self, sequence: u64) -> Result { let subject = format!("STREAM.MSG.GET.{}", &self.info.config.name); let payload = json!({ "seq": sequence, @@ -797,7 +784,10 @@ impl Stream { /// # Ok(()) /// # } /// ``` - pub async fn consumer_info>(&self, name: T) -> Result { + pub async fn consumer_info>( + &self, + name: T, + ) -> Result { let name = name.as_ref(); let subject = format!("CONSUMER.INFO.{}.{}", self.info.config.name, name); @@ -832,7 +822,7 @@ impl Stream { pub async fn get_consumer( &self, name: &str, - ) -> Result, Error> { + ) -> Result, crate::Error> { let info = self.consumer_info(name).await?; Ok(Consumer::new( @@ -1228,7 +1218,7 @@ pub struct RawMessage { } impl TryFrom for crate::Message { - type Error = Error; + type Error = crate::Error; fn try_from(value: RawMessage) -> Result { let decoded_payload = STANDARD @@ -1268,7 +1258,7 @@ const HEADER_LINE: &str = "NATS/1.0"; #[allow(clippy::type_complexity)] fn parse_headers( buf: &[u8], -) -> Result<(Option, Option, Option), Error> { +) -> Result<(Option, Option, Option), crate::Error> { let mut headers = HeaderMap::new(); let mut maybe_status: Option = None; let mut maybe_description: Option = None; @@ -1536,31 +1526,25 @@ where } } -#[derive(Debug)] -pub struct PurgeError { - kind: PurgeErrorKind, - source: Option, -} - -#[derive(Debug, Clone, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub enum PurgeErrorKind { Request, TimedOut, JetStream(super::errors::Error), } -crate::error_impls!(PurgeError, PurgeErrorKind); -impl Display for PurgeError { +impl Display for PurgeErrorKind { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let source = self.format_source(); - match &self.kind { - PurgeErrorKind::Request => write!(f, "request failed: {}", source), - PurgeErrorKind::TimedOut => write!(f, "timed out"), - PurgeErrorKind::JetStream(err) => write!(f, "JetStream error: {}", err), + match self { + Self::Request => write!(f, "request failed"), + Self::TimedOut => write!(f, "timed out"), + Self::JetStream(err) => write!(f, "JetStream error: {}", err), } } } +pub type PurgeError = Error; + impl<'a, S, K> IntoFuture for Purge<'a, S, K> where S: ToAssign + std::marker::Send, @@ -1603,6 +1587,7 @@ struct ConsumerInfoPage { consumers: Option>, } +type ConsumerNamesErrorKind = StreamsErrorKind; type ConsumerNamesError = StreamsError; type PageRequest<'a> = BoxFuture<'a, Result>; @@ -1627,7 +1612,7 @@ impl futures::Stream for ConsumerNames<'_> { std::task::Poll::Ready(page) => { self.page_request = None; let page = page.map_err(|err| { - ConsumerNamesError::with_source(RequestErrorKind::Other, err) + ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err) })?; if let Some(consumers) = page.consumers { @@ -1680,6 +1665,7 @@ impl futures::Stream for ConsumerNames<'_> { } } +pub type ConsumersErrorKind = StreamsErrorKind; pub type ConsumersError = StreamsError; type PageInfoRequest<'a> = BoxFuture<'a, Result>; @@ -1703,8 +1689,9 @@ impl futures::Stream for Consumers<'_> { Some(page) => match page.try_poll_unpin(cx) { std::task::Poll::Ready(page) => { self.page_request = None; - let page = page - .map_err(|err| ConsumersError::with_source(RequestErrorKind::Other, err))?; + let page = page.map_err(|err| { + ConsumersError::with_source(ConsumersErrorKind::Other, err) + })?; if let Some(consumers) = page.consumers { self.offset += consumers.len(); self.consumers = consumers; @@ -1755,59 +1742,49 @@ impl futures::Stream for Consumers<'_> { } } -#[derive(Debug)] -pub struct LastRawMessageError { - source: Option, - kind: LastRawMessageErrorKind, +#[derive(Clone, Debug, PartialEq)] +pub enum LastRawMessageErrorKind { + NoMessageFound, + JetStream(super::errors::Error), + Other, } -crate::error_impls!(LastRawMessageError, LastRawMessageErrorKind); -impl fmt::Display for LastRawMessageError { +impl Display for LastRawMessageErrorKind { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match &self.kind { - LastRawMessageErrorKind::NoMessageFound => write!(f, "no message found"), - LastRawMessageErrorKind::Other => write!( - f, - "failed to get last raw message: {}", - self.format_source() - ), - LastRawMessageErrorKind::JetStream(err) => { - write!(f, "JetStream error: {}", err) - } + match self { + Self::NoMessageFound => write!(f, "no message found"), + Self::Other => write!(f, "failed to get last raw message"), + Self::JetStream(err) => write!(f, "JetStream error: {}", err), } } } -#[derive(Debug, PartialEq, Clone)] -pub enum LastRawMessageErrorKind { - NoMessageFound, +pub type LastRawMessageError = Error; + +#[derive(Clone, Debug, PartialEq)] +pub enum ConsumerErrorKind { + //TODO: get last should have timeout, which should be mapped here. + TimedOut, + Request, + InvalidConsumerType, JetStream(super::errors::Error), Other, } -#[derive(Debug)] -pub struct ConsumerError { - pub kind: ConsumerErrorKind, - pub source: Option, -} - -crate::error_impls!(ConsumerError, ConsumerErrorKind); - -impl Display for ConsumerError { +impl Display for ConsumerErrorKind { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let source = self.format_source(); - match &self.kind() { - ConsumerErrorKind::TimedOut => write!(f, "timed out"), - ConsumerErrorKind::Request => write!(f, "request failed: {}", source), - ConsumerErrorKind::JetStream(err) => write!(f, "JetStream error: {}", err), - ConsumerErrorKind::Other => write!(f, "consumer error: {}", source), - ConsumerErrorKind::InvalidConsumerType => { - write!(f, "invalid consumer type: {}", source) - } + match self { + Self::TimedOut => write!(f, "timed out"), + Self::Request => write!(f, "request failed"), + Self::JetStream(err) => write!(f, "JetStream error: {}", err), + Self::Other => write!(f, "consumer error"), + Self::InvalidConsumerType => write!(f, "invalid consumer type"), } } } +pub type ConsumerError = Error; + impl From for ConsumerError { fn from(err: super::context::RequestError) -> Self { match err.kind() { @@ -1822,13 +1799,3 @@ impl From for ConsumerError { ConsumerError::new(ConsumerErrorKind::JetStream(err)) } } - -#[derive(Debug, PartialEq, Clone)] -pub enum ConsumerErrorKind { - //TODO: get last should have timeout, which should be mapped here. - TimedOut, - Request, - InvalidConsumerType, - JetStream(super::errors::Error), - Other, -} diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 3288e2a1a..200f88f7e 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -173,6 +173,7 @@ pub use auth::Auth; pub use client::{Client, PublishError, Request, RequestError, RequestErrorKind, SubscribeError}; pub use options::{AuthError, ConnectOptions}; +pub mod error; pub mod header; pub mod jetstream; pub mod message; @@ -856,55 +857,26 @@ pub enum ConnectErrorKind { Io, } -/// Returned when initial connection fails. -/// To be enumerate over the variants, call [ConnectError::kind]. -#[derive(Debug, Error)] -pub struct ConnectError { - kind: ConnectErrorKind, - source: Option, -} - -impl Display for ConnectError { +impl Display for ConnectErrorKind { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let source_info = self - .source - .as_ref() - .map(|s| s.to_string()) - .unwrap_or_else(|| "no details".to_string()); - match self.kind { - ConnectErrorKind::ServerParse => { - write!(f, "failed to parse server or server list: {}", source_info) - } - ConnectErrorKind::Dns => write!(f, "DNS error: {}", source_info), - ConnectErrorKind::Authentication => write!(f, "failed signing nonce"), - ConnectErrorKind::AuthorizationViolation => write!(f, "authorization violation"), - ConnectErrorKind::TimedOut => write!(f, "timed out"), - ConnectErrorKind::Tls => write!(f, "TLS error: {}", source_info), - ConnectErrorKind::Io => write!(f, "{}", source_info), + match self { + Self::ServerParse => write!(f, "failed to parse server or server list"), + Self::Dns => write!(f, "DNS error"), + Self::Authentication => write!(f, "failed signing nonce"), + Self::AuthorizationViolation => write!(f, "authorization violation"), + Self::TimedOut => write!(f, "timed out"), + Self::Tls => write!(f, "TLS error"), + Self::Io => write!(f, "IO error"), } } } -impl ConnectError { - fn with_source(kind: ConnectErrorKind, source: E) -> ConnectError - where - E: Into, - { - ConnectError { - kind, - source: Some(source.into()), - } - } - fn new(kind: ConnectErrorKind) -> ConnectError { - ConnectError { kind, source: None } - } - pub fn kind(&self) -> ConnectErrorKind { - self.kind - } -} +/// Returned when initial connection fails. +/// To be enumerate over the variants, call [ConnectError::kind]. +pub type ConnectError = error::Error; -impl From for ConnectError { - fn from(err: std::io::Error) -> Self { +impl From for ConnectError { + fn from(err: io::Error) -> Self { ConnectError::with_source(ConnectErrorKind::Io, err) } } @@ -1343,49 +1315,6 @@ macro_rules! from_with_timeout { } pub(crate) use from_with_timeout; -// TODO: rewrite into derivable proc macro. -macro_rules! error_impls { - ($t:ty, $k:ty) => { - impl $t { - #[allow(dead_code)] - #[allow(unreachable_pub)] - pub(crate) fn new(kind: $k) -> $t { - Self { kind, source: None } - } - #[allow(dead_code)] - #[allow(unreachable_pub)] - pub(crate) fn with_source(kind: $k, source: S) -> $t - where - S: Into, - { - Self { - kind, - source: Some(source.into()), - } - } - #[allow(dead_code)] - #[allow(unreachable_pub)] - pub fn kind(&self) -> $k { - // ALmost all `kind` types implement `Copy`, so it's almost always copy. - // We need to clone, as some more complex one may have nested other errors, that - // implement Clone only. - self.kind.clone() - } - #[allow(dead_code)] - #[allow(unreachable_pub)] - pub(crate) fn format_source(&self) -> String { - self.source - .as_ref() - .map(|err| err.to_string()) - .unwrap_or("unknown".to_string()) - } - } - impl std::error::Error for $t {} - }; -} - -pub(crate) use error_impls; - #[cfg(test)] mod tests { use super::*; diff --git a/async-nats/src/options.rs b/async-nats/src/options.rs index 2741896dc..fec11816b 100644 --- a/async-nats/src/options.rs +++ b/async-nats/src/options.rs @@ -18,7 +18,13 @@ use base64::engine::general_purpose::URL_SAFE_NO_PAD; use base64::engine::Engine; use futures::Future; use std::fmt::Formatter; -use std::{fmt, path::PathBuf, pin::Pin, sync::Arc, time::Duration}; +use std::{ + fmt, + path::{Path, PathBuf}, + pin::Pin, + sync::Arc, + time::Duration, +}; use tokio::io; use tokio_rustls::rustls; @@ -405,15 +411,15 @@ impl ConnectOptions { /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::ConnectError> { - /// let nc = async_nats::ConnectOptions::with_credentials_file("path/to/my.creds".into()) + /// let nc = async_nats::ConnectOptions::with_credentials_file("path/to/my.creds") /// .await? /// .connect("connect.ngs.global") /// .await?; /// # Ok(()) /// # } /// ``` - pub async fn with_credentials_file(path: PathBuf) -> io::Result { - let cred_file_contents = crate::auth_utils::load_creds(path).await?; + pub async fn with_credentials_file(path: impl AsRef) -> io::Result { + let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?; Self::with_credentials(&cred_file_contents) } @@ -426,15 +432,15 @@ impl ConnectOptions { /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::ConnectError> { /// let nc = async_nats::ConnectOptions::new() - /// .credentials_file("path/to/my.creds".into()) + /// .credentials_file("path/to/my.creds") /// .await? /// .connect("connect.ngs.global") /// .await?; /// # Ok(()) /// # } /// ``` - pub async fn credentials_file(self, path: PathBuf) -> io::Result { - let cred_file_contents = crate::auth_utils::load_creds(path).await?; + pub async fn credentials_file(self, path: impl AsRef) -> io::Result { + let cred_file_contents = crate::auth_utils::load_creds(path.as_ref()).await?; self.credentials(&cred_file_contents) } diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 8553f12d6..e5a7fa818 100644 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -2137,11 +2137,7 @@ mod jetstream { #[cfg(feature = "slow_tests")] #[tokio::test] async fn pull_consumer_stream_with_heartbeat() { - tracing_subscriber::fmt() - .with_max_level(Level::DEBUG) - .init(); - - use tracing::{debug, Level}; + use tracing::debug; let server = nats_server::run_server("tests/configs/jetstream.conf"); let client = ConnectOptions::new() .event_callback(|err| async move { println!("error: {err:?}") }) diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index f99387313..86903f722 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -757,6 +757,10 @@ mod kv { let name = test.get("name").await.unwrap(); assert_eq!(from_utf8(&name.unwrap()).unwrap(), "ivan".to_string()); + test.purge("name").await.unwrap(); + let name = test.get("name").await.unwrap(); + assert!(name.is_none()); + // Shutdown HUB and test get still work. drop(hub_server); diff --git a/nats-server/Cargo.toml b/nats-server/Cargo.toml index a1424f05a..14099dbf4 100644 --- a/nats-server/Cargo.toml +++ b/nats-server/Cargo.toml @@ -11,11 +11,11 @@ lazy_static = "1.4.0" regex = { version = "1.7.1", default-features = false, features = ["std", "unicode-perl"] } url = "2" json = "0.12" -nuid = "0.3.2" +nuid = "0.4.1" rand = "0.8" tokio-retry = "0.3.0" [dev-dependencies] -async-nats = "0.29" +async-nats = "0.31" tokio = { version = "1", features = ["full"] } futures = "0.3"