diff --git a/CHANGELOG.md b/CHANGELOG.md index 6464013ae..70f70d632 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,16 @@ +# 0.13.0 + +## Improvements + +- #197 JetStream configuration objects now implement + PartialEq and Eq. + +## Breaking Changes + +- #197 Some JetStream configuration objects have been + simplified while more closely matching the Golang + JS client semantics. + # 0.12.1 ## Improvements diff --git a/Cargo.toml b/Cargo.toml index 0820011f9..5e30591f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "nats" -version = "0.12.1" +version = "0.13.0" description = "A Rust NATS client" authors = ["Derek Collison ", "Tyler Neely ", "Stjepan Glavina "] edition = "2018" diff --git a/src/jetstream.rs b/src/jetstream.rs index f26fc6733..92d65a496 100644 --- a/src/jetstream.rs +++ b/src/jetstream.rs @@ -445,7 +445,7 @@ impl NatsClient { S: AsRef, ConsumerConfig: From, { - let mut config = ConsumerConfig::from(cfg); + let config = ConsumerConfig::from(cfg); let stream = stream.as_ref(); if stream.is_empty() { return Err(Error::new( @@ -454,18 +454,13 @@ impl NatsClient { )); } - let subject = if let Some(durable_name) = &config.durable_name { - if durable_name.is_empty() { - config.durable_name = None; - format!("{}CONSUMER.CREATE.{}", self.api_prefix(), stream) - } else { - format!( - "{}CONSUMER.DURABLE.CREATE.{}.{}", - self.api_prefix(), - stream, - durable_name - ) - } + let subject = if let Some(ref durable_name) = config.durable_name { + format!( + "{}CONSUMER.DURABLE.CREATE.{}.{}", + self.api_prefix(), + stream, + durable_name + ) } else { format!("{}CONSUMER.CREATE.{}", self.api_prefix(), stream) }; @@ -559,6 +554,10 @@ impl NatsClient { match res { ApiResponse::Ok(stream_info) => Ok(stream_info), ApiResponse::Err { error, .. } => { + log::error!( + "failed to parse API response: {:?}", + std::str::from_utf8(&res_msg.data) + ); if let Some(desc) = error.description { Err(Error::new(ErrorKind::Other, desc)) } else { @@ -631,7 +630,7 @@ impl Consumer { let stream = stream.as_ref().to_string(); let cfg = ConsumerConfig::from(cfg); - if let Some(durable_name) = &cfg.durable_name { + if let Some(ref durable_name) = cfg.durable_name { // attempt to create a durable config if it does not yet exist let consumer_info = nc.consumer_info(&stream, durable_name); if let Err(e) = consumer_info { @@ -661,7 +660,7 @@ impl Consumer { let cfg = ConsumerConfig::from(cfg); let push_subscriber = - if let Some(deliver_subject) = &cfg.deliver_subject { + if let Some(ref deliver_subject) = cfg.deliver_subject { Some(nc.subscribe(deliver_subject)?) } else { None diff --git a/src/jetstream_types.rs b/src/jetstream_types.rs index 066649c1e..7ee69c0c6 100644 --- a/src/jetstream_types.rs +++ b/src/jetstream_types.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use chrono::{DateTime as ChronoDateTime, Utc}; /// A UTC time -#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] pub struct DateTime(pub ChronoDateTime); impl Default for DateTime { @@ -33,7 +33,7 @@ pub(crate) struct CreateConsumerRequest { /// Configuration for consumers. From a high level, the /// `durable_name` and `deliver_subject` fields have a particularly /// strong influence on the consumer's overall behavior. -#[derive(Debug, Default, Serialize, Deserialize, Clone)] +#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct ConsumerConfig { /// Setting `deliver_subject` to `Some(...)` will cause this consumer /// to be "push-based". This is analogous in some ways to a normal @@ -65,6 +65,7 @@ pub struct ConsumerConfig { /// "exactly once" semantics, it is necessary to implement idempotent /// semantics in any system that is written to as a result of processing /// a message. + #[serde(default, skip_serializing_if = "Option::is_none")] pub deliver_subject: Option, /// Setting `durable_name` to `Some(...)` will cause this consumer @@ -82,35 +83,45 @@ pub struct ConsumerConfig { /// progress in the case of a crash, such as certain "high churn" /// workloads or workloads where a crashed instance is not required /// to recover. + #[serde(default, skip_serializing_if = "Option::is_none")] pub durable_name: Option, /// Allows for a variety of options that determine how this consumer will receive messages pub deliver_policy: DeliverPolicy, /// Used in combination with `DeliverPolicy::ByStartSeq` to only select messages arriving /// after this sequence number. - pub opt_start_seq: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub opt_start_seq: i64, /// Used in combination with `DeliverPolicy::ByStartTime` to only select messages arriving /// after this time. + #[serde(default, skip_serializing_if = "is_default")] pub opt_start_time: Option, /// How messages should be acknowledged pub ack_policy: AckPolicy, /// How long to allow messages to remain un-acknowledged before attempting redelivery - pub ack_wait: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub ack_wait: isize, /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever. - pub max_deliver: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub max_deliver: i64, /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards. - pub filter_subject: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub filter_subject: String, /// Whether messages are sent as quickly as possible or at the rate of receipt pub replay_policy: ReplayPolicy, /// The rate of message delivery in bits per second - pub rate_limit: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub rate_limit: i64, /// What percentage of acknowledgements should be samples for observability, 0-100 - pub sample_frequency: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub sample_frequency: u8, /// The maximum number of waiting consumers. - pub max_waiting: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub max_waiting: i64, /// The maximum number of unacknowledged messages that may be /// in-flight before pausing sending additional messages to /// this consumer. - pub max_ack_pending: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub max_ack_pending: i64, } impl From<&ConsumerConfig> for ConsumerConfig { @@ -131,7 +142,7 @@ impl From<&str> for ConsumerConfig { /// `StreamConfig` determines the properties for a stream. /// There are sensible defaults for most. If no subjects are /// given the name will be used as the only subject. -#[derive(Debug, Default, Serialize, Deserialize, Clone)] +#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct StreamConfig { /// A name for the Stream. Must not have spaces, tabs or period `.` characters pub name: String, @@ -144,25 +155,35 @@ pub struct StreamConfig { pub discard: DiscardPolicy, /// Which NATS subjects to populate this stream with. Supports wildcards. Defaults to just the /// configured stream `name`. + #[serde(default, skip_serializing_if = "is_default")] pub subjects: Option>, /// How message retention is considered, `Limits` (default), `Interest` or `WorkQueue` pub retention: RetentionPolicy, /// How many Consumers can be defined for a given Stream, -1 for unlimited - pub max_consumers: isize, + pub max_consumers: i32, /// Maximum age of any message in the stream, expressed in nanoseconds - pub max_age: isize, + pub max_age: i64, /// The largest message that will be accepted by the Stream - pub max_msg_size: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub max_msg_size: i32, /// The type of storage backend, `File` (default) and `Memory` pub storage: StorageType, /// How many replicas to keep for each message in a clustered JetStream, maximum 5 pub num_replicas: usize, /// Disables acknowledging messages that are received by the Stream - pub no_ack: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub no_ack: bool, /// The window within which to track duplicate messages. - pub duplicate_window: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub duplicate_window: i64, /// The owner of the template associated with this stream. - pub template_owner: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub template_owner: String, +} + +fn is_default(_t: &T) -> bool { + // t == &T::default() + false } impl From<&StreamConfig> for StreamConfig { @@ -230,7 +251,7 @@ pub struct StreamState { } /// `DeliverPolicy` determines how the consumer should select the first message to deliver. -#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] #[repr(u8)] pub enum DeliverPolicy { /// All causes the consumer to receive the oldest messages still present in the system. @@ -262,7 +283,7 @@ impl Default for DeliverPolicy { /// Determines whether messages will be acknowledged individually, /// in batches, or never. -#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] #[repr(u8)] pub enum AckPolicy { /// All messages will be individually acknowledged. This is the default. @@ -285,7 +306,7 @@ impl Default for AckPolicy { /// `ReplayPolicy` controls whether messages are sent to a consumer /// as quickly as possible or at the rate that they were originally received at. -#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] #[repr(u8)] pub enum ReplayPolicy { /// Sends all messages in a stream to the consumer as quickly as possible. This is the default. @@ -314,7 +335,7 @@ pub struct PurgeResponse { } /// `RetentionPolicy` determines how messages in a set are retained. -#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] #[repr(u8)] pub enum RetentionPolicy { /// `Limits` (default) means that messages are retained until any given limit is reached. @@ -337,7 +358,7 @@ impl Default for RetentionPolicy { /// `DiscardPolicy` determines how we proceed when limits of messages or bytes are hit. The default, `Old` will /// remove older messages. `New` will fail to store the new message. -#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] #[repr(u8)] pub enum DiscardPolicy { /// will remove older messages when limits are hit. @@ -355,7 +376,7 @@ impl Default for DiscardPolicy { } /// determines how messages are stored for retention. -#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] #[repr(u8)] pub enum StorageType { /// Stream data is kept in files. This is the default. @@ -373,7 +394,9 @@ impl Default for StorageType { } /// Various limits imposed on a particular account. -#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)] +#[derive( + Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, +)] pub struct AccountLimits { /// Maximum memory for this account (-1 if no limit) pub max_memory: i64, @@ -386,7 +409,9 @@ pub struct AccountLimits { } /// returns current statistics about the account's `JetStream` usage. -#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)] +#[derive( + Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, +)] pub(crate) struct AccountStats { pub memory: u64, pub storage: u64, @@ -394,11 +419,12 @@ pub(crate) struct AccountStats { pub limits: AccountLimits, } -#[derive(Debug, Default, Serialize, Deserialize, Clone)] +#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)] pub(crate) struct PubAck { pub stream: String, pub seq: u64, - pub duplicate: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub duplicate: bool, } /// The kinds of response used for acknowledging a processed message. @@ -437,7 +463,7 @@ impl AsRef<[u8]> for AckKind { } /// Information about a consumer -#[derive(Debug, Default, Serialize, Deserialize, Clone)] +#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct ConsumerInfo { /// The stream being consumed pub stream_name: String, @@ -465,14 +491,16 @@ pub struct ConsumerInfo { } /// Information about the consumer's associated `JetStream` cluster -#[derive(Debug, Default, Serialize, Deserialize, Clone)] +#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct ClusterInfo { /// The leader of the cluster pub leader: String, } /// Information about a consumer and the stream it is consuming -#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)] +#[derive( + Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, +)] pub struct SequencePair { /// How far along the consumer has progressed pub consumer_seq: u64, @@ -481,23 +509,28 @@ pub struct SequencePair { } /// for getting next messages for pull based consumers. -#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)] +#[derive( + Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, +)] pub struct NextRequest { /// The number of messages that are being requested to be delivered. pub batch: usize, /// The optional number of nanoseconds that the server will store this next request for /// before forgetting about the pending batch size. - pub expires: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub expires: usize, /// This optionally causes the server not to store this pending request at all, but when there are no /// messages to deliver will send a nil bytes message with a Status header of 404, this way you /// can know when you reached the end of the stream for example. A 409 is returned if the /// Consumer has reached MaxAckPending limits. - pub no_wait: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub no_wait: bool, } #[derive(Debug, Default, Serialize, Deserialize, Clone)] pub(crate) struct StreamRequest { - pub subject: Option, + #[serde(default, skip_serializing_if = "is_default")] + pub subject: String, } /// options for subscription diff --git a/tests/jetstream.rs b/tests/jetstream.rs index 113831a29..bdd9a3141 100644 --- a/tests/jetstream.rs +++ b/tests/jetstream.rs @@ -142,7 +142,7 @@ fn jetstream_basics() -> io::Result<()> { #[test] fn jetstream_libdoc_test() { - use nats::jetstream::{AckPolicy, Consumer, ConsumerConfig}; + use nats::jetstream::Consumer; let server = server(); @@ -185,5 +185,5 @@ fn jetstream_libdoc_test() { let results: Vec> = consumer.process_batch(batch_size, |msg| Ok(msg.data.len())); let flipped: std::io::Result> = results.into_iter().collect(); - let sizes: Vec = flipped.unwrap(); + let _sizes: Vec = flipped.unwrap(); }