From b0123522c59774b84abdee468baad59af90aaa95 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Wed, 11 Aug 2021 17:57:07 +0200 Subject: [PATCH 1/5] Simplify stream configuration. More closely match the configuration semantics of the go JS client. --- src/jetstream.rs | 54 +++++++++----------- src/jetstream_types.rs | 112 ++++++++++++++++++++++++++--------------- 2 files changed, 96 insertions(+), 70 deletions(-) diff --git a/src/jetstream.rs b/src/jetstream.rs index f26fc6733..65fbd9231 100644 --- a/src/jetstream.rs +++ b/src/jetstream.rs @@ -445,7 +445,7 @@ impl NatsClient { S: AsRef, ConsumerConfig: From, { - let mut config = ConsumerConfig::from(cfg); + let config = ConsumerConfig::from(cfg); let stream = stream.as_ref(); if stream.is_empty() { return Err(Error::new( @@ -454,18 +454,13 @@ impl NatsClient { )); } - let subject = if let Some(durable_name) = &config.durable_name { - if durable_name.is_empty() { - config.durable_name = None; - format!("{}CONSUMER.CREATE.{}", self.api_prefix(), stream) - } else { - format!( - "{}CONSUMER.DURABLE.CREATE.{}.{}", - self.api_prefix(), - stream, - durable_name - ) - } + let subject = if !config.durable_name.is_empty() { + format!( + "{}CONSUMER.DURABLE.CREATE.{}.{}", + self.api_prefix(), + stream, + config.durable_name + ) } else { format!("{}CONSUMER.CREATE.{}", self.api_prefix(), stream) }; @@ -631,9 +626,9 @@ impl Consumer { let stream = stream.as_ref().to_string(); let cfg = ConsumerConfig::from(cfg); - if let Some(durable_name) = &cfg.durable_name { + if !cfg.durable_name.is_empty() { // attempt to create a durable config if it does not yet exist - let consumer_info = nc.consumer_info(&stream, durable_name); + let consumer_info = nc.consumer_info(&stream, &cfg.durable_name); if let Err(e) = consumer_info { if e.kind() == std::io::ErrorKind::Other { nc.create_consumer::<&str, &ConsumerConfig>(&stream, &cfg)?; @@ -660,12 +655,11 @@ impl Consumer { let stream = stream.as_ref().to_string(); let cfg = ConsumerConfig::from(cfg); - let push_subscriber = - if let Some(deliver_subject) = &cfg.deliver_subject { - Some(nc.subscribe(deliver_subject)?) - } else { - None - }; + let push_subscriber = if !cfg.deliver_subject.is_empty() { + Some(nc.subscribe(&cfg.deliver_subject)?) + } else { + None + }; let mut dedupe_window = IntervalTree::default(); @@ -718,7 +712,7 @@ impl Consumer { let responses = if let Some(ps) = self.push_subscriber.as_ref() { ps } else { - if self.cfg.durable_name.is_none() { + if self.cfg.durable_name.is_empty() { return vec![Err(Error::new( ErrorKind::InvalidInput, "process and process_batch are only usable from \ @@ -730,7 +724,7 @@ impl Consumer { "{}CONSUMER.MSG.NEXT.{}.{}", self.api_prefix(), self.stream, - self.cfg.durable_name.as_ref().unwrap() + self.cfg.durable_name ); let sub = @@ -826,7 +820,7 @@ impl Consumer { let next = if let Some(ps) = &self.push_subscriber { ps.next().unwrap() } else { - if self.cfg.durable_name.is_none() { + if self.cfg.durable_name.is_empty() { return Err(Error::new( ErrorKind::InvalidInput, "process and process_batch are only usable from \ @@ -838,7 +832,7 @@ impl Consumer { "{}CONSUMER.MSG.NEXT.{}.{}", self.api_prefix(), self.stream, - self.cfg.durable_name.as_ref().unwrap() + self.cfg.durable_name ); self.nc.request(&subject, AckKind::Ack)? @@ -893,7 +887,7 @@ impl Consumer { let next = if let Some(ps) = &self.push_subscriber { ps.next_timeout(self.timeout)? } else { - if self.cfg.durable_name.is_none() { + if self.cfg.durable_name.is_empty() { return Err(Error::new( ErrorKind::InvalidInput, "process and process_batch are only usable from \ @@ -905,7 +899,7 @@ impl Consumer { "{}CONSUMER.MSG.NEXT.{}.{}", self.api_prefix(), self.stream, - self.cfg.durable_name.as_ref().unwrap() + self.cfg.durable_name ); self.nc.request_timeout(&subject, b"", self.timeout)? @@ -963,7 +957,7 @@ impl Consumer { &mut self, next_request: NextRequest, ) -> io::Result { - if self.cfg.durable_name.is_none() { + if self.cfg.durable_name.is_empty() { return Err(Error::new( ErrorKind::InvalidInput, "this method is only usable from \ @@ -971,7 +965,7 @@ impl Consumer { )); } - if self.cfg.deliver_subject.is_some() { + if !self.cfg.deliver_subject.is_empty() { return Err(Error::new( ErrorKind::InvalidInput, "this method is only usable from \ @@ -983,7 +977,7 @@ impl Consumer { "{}CONSUMER.MSG.NEXT.{}.{}", self.api_prefix(), self.stream, - self.cfg.durable_name.as_ref().unwrap() + self.cfg.durable_name ); let req = serde_json::ser::to_vec(&next_request).unwrap(); diff --git a/src/jetstream_types.rs b/src/jetstream_types.rs index 066649c1e..afefa8175 100644 --- a/src/jetstream_types.rs +++ b/src/jetstream_types.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use chrono::{DateTime as ChronoDateTime, Utc}; /// A UTC time -#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] pub struct DateTime(pub ChronoDateTime); impl Default for DateTime { @@ -33,7 +33,7 @@ pub(crate) struct CreateConsumerRequest { /// Configuration for consumers. From a high level, the /// `durable_name` and `deliver_subject` fields have a particularly /// strong influence on the consumer's overall behavior. -#[derive(Debug, Default, Serialize, Deserialize, Clone)] +#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct ConsumerConfig { /// Setting `deliver_subject` to `Some(...)` will cause this consumer /// to be "push-based". This is analogous in some ways to a normal @@ -65,7 +65,8 @@ pub struct ConsumerConfig { /// "exactly once" semantics, it is necessary to implement idempotent /// semantics in any system that is written to as a result of processing /// a message. - pub deliver_subject: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub deliver_subject: String, /// Setting `durable_name` to `Some(...)` will cause this consumer /// to be "durable". This may be a good choice for workloads that @@ -82,35 +83,45 @@ pub struct ConsumerConfig { /// progress in the case of a crash, such as certain "high churn" /// workloads or workloads where a crashed instance is not required /// to recover. - pub durable_name: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub durable_name: String, /// Allows for a variety of options that determine how this consumer will receive messages pub deliver_policy: DeliverPolicy, /// Used in combination with `DeliverPolicy::ByStartSeq` to only select messages arriving /// after this sequence number. - pub opt_start_seq: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub opt_start_seq: i64, /// Used in combination with `DeliverPolicy::ByStartTime` to only select messages arriving /// after this time. - pub opt_start_time: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub opt_start_time: DateTime, /// How messages should be acknowledged pub ack_policy: AckPolicy, /// How long to allow messages to remain un-acknowledged before attempting redelivery - pub ack_wait: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub ack_wait: isize, /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever. - pub max_deliver: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub max_deliver: i64, /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards. - pub filter_subject: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub filter_subject: String, /// Whether messages are sent as quickly as possible or at the rate of receipt pub replay_policy: ReplayPolicy, /// The rate of message delivery in bits per second - pub rate_limit: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub rate_limit: i64, /// What percentage of acknowledgements should be samples for observability, 0-100 - pub sample_frequency: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub sample_frequency: u8, /// The maximum number of waiting consumers. - pub max_waiting: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub max_waiting: i64, /// The maximum number of unacknowledged messages that may be /// in-flight before pausing sending additional messages to /// this consumer. - pub max_ack_pending: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub max_ack_pending: i64, } impl From<&ConsumerConfig> for ConsumerConfig { @@ -122,7 +133,7 @@ impl From<&ConsumerConfig> for ConsumerConfig { impl From<&str> for ConsumerConfig { fn from(s: &str) -> ConsumerConfig { ConsumerConfig { - durable_name: Some(s.to_string()), + durable_name: s.to_string(), ..Default::default() } } @@ -131,38 +142,47 @@ impl From<&str> for ConsumerConfig { /// `StreamConfig` determines the properties for a stream. /// There are sensible defaults for most. If no subjects are /// given the name will be used as the only subject. -#[derive(Debug, Default, Serialize, Deserialize, Clone)] +#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct StreamConfig { /// A name for the Stream. Must not have spaces, tabs or period `.` characters pub name: String, /// How large the Stream may become in total bytes before the configured discard policy kicks in - pub max_bytes: i64, + pub max_bytes: u64, /// How large the Stream may become in total messages before the configured discard policy kicks in - pub max_msgs: i64, + pub max_msgs: u64, /// When a Stream has reached its configured `max_bytes` or `max_msgs`, this policy kicks in. /// `DiscardPolicy::New` refuses new messages or `DiscardPolicy::Old` (default) deletes old messages to make space pub discard: DiscardPolicy, /// Which NATS subjects to populate this stream with. Supports wildcards. Defaults to just the /// configured stream `name`. - pub subjects: Option>, + #[serde(default, skip_serializing_if = "is_default")] + pub subjects: Vec, /// How message retention is considered, `Limits` (default), `Interest` or `WorkQueue` pub retention: RetentionPolicy, /// How many Consumers can be defined for a given Stream, -1 for unlimited pub max_consumers: isize, /// Maximum age of any message in the stream, expressed in nanoseconds - pub max_age: isize, + pub max_age: u64, /// The largest message that will be accepted by the Stream - pub max_msg_size: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub max_msg_size: usize, /// The type of storage backend, `File` (default) and `Memory` pub storage: StorageType, /// How many replicas to keep for each message in a clustered JetStream, maximum 5 pub num_replicas: usize, /// Disables acknowledging messages that are received by the Stream - pub no_ack: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub no_ack: bool, /// The window within which to track duplicate messages. - pub duplicate_window: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub duplicate_window: u64, /// The owner of the template associated with this stream. - pub template_owner: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub template_owner: String, +} + +fn is_default(t: &T) -> bool { + t == &T::default() } impl From<&StreamConfig> for StreamConfig { @@ -230,7 +250,7 @@ pub struct StreamState { } /// `DeliverPolicy` determines how the consumer should select the first message to deliver. -#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] #[repr(u8)] pub enum DeliverPolicy { /// All causes the consumer to receive the oldest messages still present in the system. @@ -262,7 +282,7 @@ impl Default for DeliverPolicy { /// Determines whether messages will be acknowledged individually, /// in batches, or never. -#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] #[repr(u8)] pub enum AckPolicy { /// All messages will be individually acknowledged. This is the default. @@ -285,7 +305,7 @@ impl Default for AckPolicy { /// `ReplayPolicy` controls whether messages are sent to a consumer /// as quickly as possible or at the rate that they were originally received at. -#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] #[repr(u8)] pub enum ReplayPolicy { /// Sends all messages in a stream to the consumer as quickly as possible. This is the default. @@ -314,7 +334,7 @@ pub struct PurgeResponse { } /// `RetentionPolicy` determines how messages in a set are retained. -#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] #[repr(u8)] pub enum RetentionPolicy { /// `Limits` (default) means that messages are retained until any given limit is reached. @@ -337,7 +357,7 @@ impl Default for RetentionPolicy { /// `DiscardPolicy` determines how we proceed when limits of messages or bytes are hit. The default, `Old` will /// remove older messages. `New` will fail to store the new message. -#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] #[repr(u8)] pub enum DiscardPolicy { /// will remove older messages when limits are hit. @@ -355,7 +375,7 @@ impl Default for DiscardPolicy { } /// determines how messages are stored for retention. -#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] #[repr(u8)] pub enum StorageType { /// Stream data is kept in files. This is the default. @@ -373,7 +393,9 @@ impl Default for StorageType { } /// Various limits imposed on a particular account. -#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)] +#[derive( + Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, +)] pub struct AccountLimits { /// Maximum memory for this account (-1 if no limit) pub max_memory: i64, @@ -386,7 +408,9 @@ pub struct AccountLimits { } /// returns current statistics about the account's `JetStream` usage. -#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)] +#[derive( + Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, +)] pub(crate) struct AccountStats { pub memory: u64, pub storage: u64, @@ -394,11 +418,12 @@ pub(crate) struct AccountStats { pub limits: AccountLimits, } -#[derive(Debug, Default, Serialize, Deserialize, Clone)] +#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)] pub(crate) struct PubAck { pub stream: String, pub seq: u64, - pub duplicate: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub duplicate: bool, } /// The kinds of response used for acknowledging a processed message. @@ -437,7 +462,7 @@ impl AsRef<[u8]> for AckKind { } /// Information about a consumer -#[derive(Debug, Default, Serialize, Deserialize, Clone)] +#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct ConsumerInfo { /// The stream being consumed pub stream_name: String, @@ -465,14 +490,16 @@ pub struct ConsumerInfo { } /// Information about the consumer's associated `JetStream` cluster -#[derive(Debug, Default, Serialize, Deserialize, Clone)] +#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct ClusterInfo { /// The leader of the cluster pub leader: String, } /// Information about a consumer and the stream it is consuming -#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)] +#[derive( + Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, +)] pub struct SequencePair { /// How far along the consumer has progressed pub consumer_seq: u64, @@ -481,23 +508,28 @@ pub struct SequencePair { } /// for getting next messages for pull based consumers. -#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)] +#[derive( + Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, +)] pub struct NextRequest { /// The number of messages that are being requested to be delivered. pub batch: usize, /// The optional number of nanoseconds that the server will store this next request for /// before forgetting about the pending batch size. - pub expires: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub expires: usize, /// This optionally causes the server not to store this pending request at all, but when there are no /// messages to deliver will send a nil bytes message with a Status header of 404, this way you /// can know when you reached the end of the stream for example. A 409 is returned if the /// Consumer has reached MaxAckPending limits. - pub no_wait: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub no_wait: bool, } #[derive(Debug, Default, Serialize, Deserialize, Clone)] pub(crate) struct StreamRequest { - pub subject: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub subject: String, } /// options for subscription From 2f32babfcd64a708681995603eb8f9cde5b6b499 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Wed, 11 Aug 2021 18:47:22 +0200 Subject: [PATCH 2/5] Re-add some important ergonomic rustisms to JS configuration --- src/jetstream.rs | 41 +++++++++++++++++++++++------------------ src/jetstream_types.rs | 31 ++++++++++++++++--------------- tests/jetstream.rs | 4 ++-- 3 files changed, 41 insertions(+), 35 deletions(-) diff --git a/src/jetstream.rs b/src/jetstream.rs index 65fbd9231..40fd50be4 100644 --- a/src/jetstream.rs +++ b/src/jetstream.rs @@ -454,12 +454,12 @@ impl NatsClient { )); } - let subject = if !config.durable_name.is_empty() { + let subject = if let Some(ref durable_name) = config.durable_name { format!( "{}CONSUMER.DURABLE.CREATE.{}.{}", self.api_prefix(), stream, - config.durable_name + durable_name ) } else { format!("{}CONSUMER.CREATE.{}", self.api_prefix(), stream) @@ -554,6 +554,10 @@ impl NatsClient { match res { ApiResponse::Ok(stream_info) => Ok(stream_info), ApiResponse::Err { error, .. } => { + log::error!( + "failed to parse API response: {:?}", + std::str::from_utf8(&res_msg.data) + ); if let Some(desc) = error.description { Err(Error::new(ErrorKind::Other, desc)) } else { @@ -626,9 +630,9 @@ impl Consumer { let stream = stream.as_ref().to_string(); let cfg = ConsumerConfig::from(cfg); - if !cfg.durable_name.is_empty() { + if let Some(ref durable_name) = cfg.durable_name { // attempt to create a durable config if it does not yet exist - let consumer_info = nc.consumer_info(&stream, &cfg.durable_name); + let consumer_info = nc.consumer_info(&stream, durable_name); if let Err(e) = consumer_info { if e.kind() == std::io::ErrorKind::Other { nc.create_consumer::<&str, &ConsumerConfig>(&stream, &cfg)?; @@ -655,11 +659,12 @@ impl Consumer { let stream = stream.as_ref().to_string(); let cfg = ConsumerConfig::from(cfg); - let push_subscriber = if !cfg.deliver_subject.is_empty() { - Some(nc.subscribe(&cfg.deliver_subject)?) - } else { - None - }; + let push_subscriber = + if let Some(ref deliver_subject) = cfg.deliver_subject { + Some(nc.subscribe(&deliver_subject)?) + } else { + None + }; let mut dedupe_window = IntervalTree::default(); @@ -712,7 +717,7 @@ impl Consumer { let responses = if let Some(ps) = self.push_subscriber.as_ref() { ps } else { - if self.cfg.durable_name.is_empty() { + if self.cfg.durable_name.is_none() { return vec![Err(Error::new( ErrorKind::InvalidInput, "process and process_batch are only usable from \ @@ -724,7 +729,7 @@ impl Consumer { "{}CONSUMER.MSG.NEXT.{}.{}", self.api_prefix(), self.stream, - self.cfg.durable_name + self.cfg.durable_name.as_ref().unwrap() ); let sub = @@ -820,7 +825,7 @@ impl Consumer { let next = if let Some(ps) = &self.push_subscriber { ps.next().unwrap() } else { - if self.cfg.durable_name.is_empty() { + if self.cfg.durable_name.is_none() { return Err(Error::new( ErrorKind::InvalidInput, "process and process_batch are only usable from \ @@ -832,7 +837,7 @@ impl Consumer { "{}CONSUMER.MSG.NEXT.{}.{}", self.api_prefix(), self.stream, - self.cfg.durable_name + self.cfg.durable_name.as_ref().unwrap() ); self.nc.request(&subject, AckKind::Ack)? @@ -887,7 +892,7 @@ impl Consumer { let next = if let Some(ps) = &self.push_subscriber { ps.next_timeout(self.timeout)? } else { - if self.cfg.durable_name.is_empty() { + if self.cfg.durable_name.is_none() { return Err(Error::new( ErrorKind::InvalidInput, "process and process_batch are only usable from \ @@ -899,7 +904,7 @@ impl Consumer { "{}CONSUMER.MSG.NEXT.{}.{}", self.api_prefix(), self.stream, - self.cfg.durable_name + self.cfg.durable_name.as_ref().unwrap() ); self.nc.request_timeout(&subject, b"", self.timeout)? @@ -957,7 +962,7 @@ impl Consumer { &mut self, next_request: NextRequest, ) -> io::Result { - if self.cfg.durable_name.is_empty() { + if self.cfg.durable_name.is_none() { return Err(Error::new( ErrorKind::InvalidInput, "this method is only usable from \ @@ -965,7 +970,7 @@ impl Consumer { )); } - if !self.cfg.deliver_subject.is_empty() { + if self.cfg.deliver_subject.is_some() { return Err(Error::new( ErrorKind::InvalidInput, "this method is only usable from \ @@ -977,7 +982,7 @@ impl Consumer { "{}CONSUMER.MSG.NEXT.{}.{}", self.api_prefix(), self.stream, - self.cfg.durable_name + self.cfg.durable_name.as_ref().unwrap() ); let req = serde_json::ser::to_vec(&next_request).unwrap(); diff --git a/src/jetstream_types.rs b/src/jetstream_types.rs index afefa8175..7ee69c0c6 100644 --- a/src/jetstream_types.rs +++ b/src/jetstream_types.rs @@ -65,8 +65,8 @@ pub struct ConsumerConfig { /// "exactly once" semantics, it is necessary to implement idempotent /// semantics in any system that is written to as a result of processing /// a message. - #[serde(default, skip_serializing_if = "is_default")] - pub deliver_subject: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub deliver_subject: Option, /// Setting `durable_name` to `Some(...)` will cause this consumer /// to be "durable". This may be a good choice for workloads that @@ -83,8 +83,8 @@ pub struct ConsumerConfig { /// progress in the case of a crash, such as certain "high churn" /// workloads or workloads where a crashed instance is not required /// to recover. - #[serde(default, skip_serializing_if = "is_default")] - pub durable_name: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub durable_name: Option, /// Allows for a variety of options that determine how this consumer will receive messages pub deliver_policy: DeliverPolicy, /// Used in combination with `DeliverPolicy::ByStartSeq` to only select messages arriving @@ -94,7 +94,7 @@ pub struct ConsumerConfig { /// Used in combination with `DeliverPolicy::ByStartTime` to only select messages arriving /// after this time. #[serde(default, skip_serializing_if = "is_default")] - pub opt_start_time: DateTime, + pub opt_start_time: Option, /// How messages should be acknowledged pub ack_policy: AckPolicy, /// How long to allow messages to remain un-acknowledged before attempting redelivery @@ -133,7 +133,7 @@ impl From<&ConsumerConfig> for ConsumerConfig { impl From<&str> for ConsumerConfig { fn from(s: &str) -> ConsumerConfig { ConsumerConfig { - durable_name: s.to_string(), + durable_name: Some(s.to_string()), ..Default::default() } } @@ -147,25 +147,25 @@ pub struct StreamConfig { /// A name for the Stream. Must not have spaces, tabs or period `.` characters pub name: String, /// How large the Stream may become in total bytes before the configured discard policy kicks in - pub max_bytes: u64, + pub max_bytes: i64, /// How large the Stream may become in total messages before the configured discard policy kicks in - pub max_msgs: u64, + pub max_msgs: i64, /// When a Stream has reached its configured `max_bytes` or `max_msgs`, this policy kicks in. /// `DiscardPolicy::New` refuses new messages or `DiscardPolicy::Old` (default) deletes old messages to make space pub discard: DiscardPolicy, /// Which NATS subjects to populate this stream with. Supports wildcards. Defaults to just the /// configured stream `name`. #[serde(default, skip_serializing_if = "is_default")] - pub subjects: Vec, + pub subjects: Option>, /// How message retention is considered, `Limits` (default), `Interest` or `WorkQueue` pub retention: RetentionPolicy, /// How many Consumers can be defined for a given Stream, -1 for unlimited - pub max_consumers: isize, + pub max_consumers: i32, /// Maximum age of any message in the stream, expressed in nanoseconds - pub max_age: u64, + pub max_age: i64, /// The largest message that will be accepted by the Stream #[serde(default, skip_serializing_if = "is_default")] - pub max_msg_size: usize, + pub max_msg_size: i32, /// The type of storage backend, `File` (default) and `Memory` pub storage: StorageType, /// How many replicas to keep for each message in a clustered JetStream, maximum 5 @@ -175,14 +175,15 @@ pub struct StreamConfig { pub no_ack: bool, /// The window within which to track duplicate messages. #[serde(default, skip_serializing_if = "is_default")] - pub duplicate_window: u64, + pub duplicate_window: i64, /// The owner of the template associated with this stream. #[serde(default, skip_serializing_if = "is_default")] pub template_owner: String, } -fn is_default(t: &T) -> bool { - t == &T::default() +fn is_default(_t: &T) -> bool { + // t == &T::default() + false } impl From<&StreamConfig> for StreamConfig { diff --git a/tests/jetstream.rs b/tests/jetstream.rs index 113831a29..bdd9a3141 100644 --- a/tests/jetstream.rs +++ b/tests/jetstream.rs @@ -142,7 +142,7 @@ fn jetstream_basics() -> io::Result<()> { #[test] fn jetstream_libdoc_test() { - use nats::jetstream::{AckPolicy, Consumer, ConsumerConfig}; + use nats::jetstream::Consumer; let server = server(); @@ -185,5 +185,5 @@ fn jetstream_libdoc_test() { let results: Vec> = consumer.process_batch(batch_size, |msg| Ok(msg.data.len())); let flipped: std::io::Result> = results.into_iter().collect(); - let sizes: Vec = flipped.unwrap(); + let _sizes: Vec = flipped.unwrap(); } From b7c14493eaade496861eb254e7b884bfc2cf900c Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Wed, 11 Aug 2021 18:51:04 +0200 Subject: [PATCH 3/5] Update changelog and bump version --- CHANGELOG.md | 8 ++++++++ Cargo.toml | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6464013ae..0e243d7d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +# 0.13.0 + +## Breaking Changes + +- #197 Some JetStream configuration objects have been + simplified while more closely matching the Golang + JS client semantics. + # 0.12.1 ## Improvements diff --git a/Cargo.toml b/Cargo.toml index 0820011f9..5e30591f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "nats" -version = "0.12.1" +version = "0.13.0" description = "A Rust NATS client" authors = ["Derek Collison ", "Tyler Neely ", "Stjepan Glavina "] edition = "2018" From 5d1e28fdec1bbcfb67c57d6db02b33bb91b72819 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Wed, 11 Aug 2021 18:51:53 +0200 Subject: [PATCH 4/5] Update changelog with additional improvements --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e243d7d6..70f70d632 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # 0.13.0 +## Improvements + +- #197 JetStream configuration objects now implement + PartialEq and Eq. + ## Breaking Changes - #197 Some JetStream configuration objects have been From f1a6d9cad9e21e2bfa720e9ddf8543c1496ce03a Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Wed, 11 Aug 2021 18:58:32 +0200 Subject: [PATCH 5/5] Clippy feedback --- src/jetstream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/jetstream.rs b/src/jetstream.rs index 40fd50be4..92d65a496 100644 --- a/src/jetstream.rs +++ b/src/jetstream.rs @@ -661,7 +661,7 @@ impl Consumer { let push_subscriber = if let Some(ref deliver_subject) = cfg.deliver_subject { - Some(nc.subscribe(&deliver_subject)?) + Some(nc.subscribe(deliver_subject)?) } else { None };