Skip to content

Commit

Permalink
Merge pull request #197 from nats-io/tyler_js_config_ergonomics
Browse files Browse the repository at this point in the history
js config ergonomics. Closes #191.
  • Loading branch information
spacejam committed Aug 11, 2021
2 parents b55c453 + f1a6d9c commit 88bdfbd
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 52 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# 0.13.0

## Improvements

- #197 JetStream configuration objects now implement
PartialEq and Eq.

## Breaking Changes

- #197 Some JetStream configuration objects have been
simplified while more closely matching the Golang
JS client semantics.

# 0.12.1

## Improvements
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "nats"
version = "0.12.1"
version = "0.13.0"
description = "A Rust NATS client"
authors = ["Derek Collison <derek@nats.io>", "Tyler Neely <tyler@nats.io>", "Stjepan Glavina <stjepan@nats.io>"]
edition = "2018"
Expand Down
29 changes: 14 additions & 15 deletions src/jetstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ impl NatsClient {
S: AsRef<str>,
ConsumerConfig: From<C>,
{
let mut config = ConsumerConfig::from(cfg);
let config = ConsumerConfig::from(cfg);
let stream = stream.as_ref();
if stream.is_empty() {
return Err(Error::new(
Expand All @@ -454,18 +454,13 @@ impl NatsClient {
));
}

let subject = if let Some(durable_name) = &config.durable_name {
if durable_name.is_empty() {
config.durable_name = None;
format!("{}CONSUMER.CREATE.{}", self.api_prefix(), stream)
} else {
format!(
"{}CONSUMER.DURABLE.CREATE.{}.{}",
self.api_prefix(),
stream,
durable_name
)
}
let subject = if let Some(ref durable_name) = config.durable_name {
format!(
"{}CONSUMER.DURABLE.CREATE.{}.{}",
self.api_prefix(),
stream,
durable_name
)
} else {
format!("{}CONSUMER.CREATE.{}", self.api_prefix(), stream)
};
Expand Down Expand Up @@ -559,6 +554,10 @@ impl NatsClient {
match res {
ApiResponse::Ok(stream_info) => Ok(stream_info),
ApiResponse::Err { error, .. } => {
log::error!(
"failed to parse API response: {:?}",
std::str::from_utf8(&res_msg.data)
);
if let Some(desc) = error.description {
Err(Error::new(ErrorKind::Other, desc))
} else {
Expand Down Expand Up @@ -631,7 +630,7 @@ impl Consumer {
let stream = stream.as_ref().to_string();
let cfg = ConsumerConfig::from(cfg);

if let Some(durable_name) = &cfg.durable_name {
if let Some(ref durable_name) = cfg.durable_name {
// attempt to create a durable config if it does not yet exist
let consumer_info = nc.consumer_info(&stream, durable_name);
if let Err(e) = consumer_info {
Expand Down Expand Up @@ -661,7 +660,7 @@ impl Consumer {
let cfg = ConsumerConfig::from(cfg);

let push_subscriber =
if let Some(deliver_subject) = &cfg.deliver_subject {
if let Some(ref deliver_subject) = cfg.deliver_subject {
Some(nc.subscribe(deliver_subject)?)
} else {
None
Expand Down
101 changes: 67 additions & 34 deletions src/jetstream_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
use chrono::{DateTime as ChronoDateTime, Utc};

/// A UTC time
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
pub struct DateTime(pub ChronoDateTime<Utc>);

impl Default for DateTime {
Expand Down Expand Up @@ -33,7 +33,7 @@ pub(crate) struct CreateConsumerRequest {
/// Configuration for consumers. From a high level, the
/// `durable_name` and `deliver_subject` fields have a particularly
/// strong influence on the consumer's overall behavior.
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ConsumerConfig {
/// Setting `deliver_subject` to `Some(...)` will cause this consumer
/// to be "push-based". This is analogous in some ways to a normal
Expand Down Expand Up @@ -65,6 +65,7 @@ pub struct ConsumerConfig {
/// "exactly once" semantics, it is necessary to implement idempotent
/// semantics in any system that is written to as a result of processing
/// a message.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub deliver_subject: Option<String>,

/// Setting `durable_name` to `Some(...)` will cause this consumer
Expand All @@ -82,35 +83,45 @@ pub struct ConsumerConfig {
/// progress in the case of a crash, such as certain "high churn"
/// workloads or workloads where a crashed instance is not required
/// to recover.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub durable_name: Option<String>,
/// Allows for a variety of options that determine how this consumer will receive messages
pub deliver_policy: DeliverPolicy,
/// Used in combination with `DeliverPolicy::ByStartSeq` to only select messages arriving
/// after this sequence number.
pub opt_start_seq: Option<i64>,
#[serde(default, skip_serializing_if = "is_default")]
pub opt_start_seq: i64,
/// Used in combination with `DeliverPolicy::ByStartTime` to only select messages arriving
/// after this time.
#[serde(default, skip_serializing_if = "is_default")]
pub opt_start_time: Option<DateTime>,
/// How messages should be acknowledged
pub ack_policy: AckPolicy,
/// How long to allow messages to remain un-acknowledged before attempting redelivery
pub ack_wait: Option<isize>,
#[serde(default, skip_serializing_if = "is_default")]
pub ack_wait: isize,
/// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
pub max_deliver: Option<i64>,
#[serde(default, skip_serializing_if = "is_default")]
pub max_deliver: i64,
/// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
pub filter_subject: Option<String>,
#[serde(default, skip_serializing_if = "is_default")]
pub filter_subject: String,
/// Whether messages are sent as quickly as possible or at the rate of receipt
pub replay_policy: ReplayPolicy,
/// The rate of message delivery in bits per second
pub rate_limit: Option<i64>,
#[serde(default, skip_serializing_if = "is_default")]
pub rate_limit: i64,
/// What percentage of acknowledgements should be samples for observability, 0-100
pub sample_frequency: Option<u8>,
#[serde(default, skip_serializing_if = "is_default")]
pub sample_frequency: u8,
/// The maximum number of waiting consumers.
pub max_waiting: Option<i64>,
#[serde(default, skip_serializing_if = "is_default")]
pub max_waiting: i64,
/// The maximum number of unacknowledged messages that may be
/// in-flight before pausing sending additional messages to
/// this consumer.
pub max_ack_pending: Option<i64>,
#[serde(default, skip_serializing_if = "is_default")]
pub max_ack_pending: i64,
}

impl From<&ConsumerConfig> for ConsumerConfig {
Expand All @@ -131,7 +142,7 @@ impl From<&str> for ConsumerConfig {
/// `StreamConfig` determines the properties for a stream.
/// There are sensible defaults for most. If no subjects are
/// given the name will be used as the only subject.
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct StreamConfig {
/// A name for the Stream. Must not have spaces, tabs or period `.` characters
pub name: String,
Expand All @@ -144,25 +155,35 @@ pub struct StreamConfig {
pub discard: DiscardPolicy,
/// Which NATS subjects to populate this stream with. Supports wildcards. Defaults to just the
/// configured stream `name`.
#[serde(default, skip_serializing_if = "is_default")]
pub subjects: Option<Vec<String>>,
/// How message retention is considered, `Limits` (default), `Interest` or `WorkQueue`
pub retention: RetentionPolicy,
/// How many Consumers can be defined for a given Stream, -1 for unlimited
pub max_consumers: isize,
pub max_consumers: i32,
/// Maximum age of any message in the stream, expressed in nanoseconds
pub max_age: isize,
pub max_age: i64,
/// The largest message that will be accepted by the Stream
pub max_msg_size: Option<i32>,
#[serde(default, skip_serializing_if = "is_default")]
pub max_msg_size: i32,
/// The type of storage backend, `File` (default) and `Memory`
pub storage: StorageType,
/// How many replicas to keep for each message in a clustered JetStream, maximum 5
pub num_replicas: usize,
/// Disables acknowledging messages that are received by the Stream
pub no_ack: Option<bool>,
#[serde(default, skip_serializing_if = "is_default")]
pub no_ack: bool,
/// The window within which to track duplicate messages.
pub duplicate_window: Option<isize>,
#[serde(default, skip_serializing_if = "is_default")]
pub duplicate_window: i64,
/// The owner of the template associated with this stream.
pub template_owner: Option<String>,
#[serde(default, skip_serializing_if = "is_default")]
pub template_owner: String,
}

fn is_default<T: Default + Eq>(_t: &T) -> bool {
// t == &T::default()
false
}

impl From<&StreamConfig> for StreamConfig {
Expand Down Expand Up @@ -230,7 +251,7 @@ pub struct StreamState {
}

/// `DeliverPolicy` determines how the consumer should select the first message to deliver.
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum DeliverPolicy {
/// All causes the consumer to receive the oldest messages still present in the system.
Expand Down Expand Up @@ -262,7 +283,7 @@ impl Default for DeliverPolicy {

/// Determines whether messages will be acknowledged individually,
/// in batches, or never.
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)]
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum AckPolicy {
/// All messages will be individually acknowledged. This is the default.
Expand All @@ -285,7 +306,7 @@ impl Default for AckPolicy {

/// `ReplayPolicy` controls whether messages are sent to a consumer
/// as quickly as possible or at the rate that they were originally received at.
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum ReplayPolicy {
/// Sends all messages in a stream to the consumer as quickly as possible. This is the default.
Expand Down Expand Up @@ -314,7 +335,7 @@ pub struct PurgeResponse {
}

/// `RetentionPolicy` determines how messages in a set are retained.
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum RetentionPolicy {
/// `Limits` (default) means that messages are retained until any given limit is reached.
Expand All @@ -337,7 +358,7 @@ impl Default for RetentionPolicy {

/// `DiscardPolicy` determines how we proceed when limits of messages or bytes are hit. The default, `Old` will
/// remove older messages. `New` will fail to store the new message.
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum DiscardPolicy {
/// will remove older messages when limits are hit.
Expand All @@ -355,7 +376,7 @@ impl Default for DiscardPolicy {
}

/// determines how messages are stored for retention.
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum StorageType {
/// Stream data is kept in files. This is the default.
Expand All @@ -373,7 +394,9 @@ impl Default for StorageType {
}

/// Various limits imposed on a particular account.
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)]
#[derive(
Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq,
)]
pub struct AccountLimits {
/// Maximum memory for this account (-1 if no limit)
pub max_memory: i64,
Expand All @@ -386,19 +409,22 @@ pub struct AccountLimits {
}

/// returns current statistics about the account's `JetStream` usage.
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)]
#[derive(
Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq,
)]
pub(crate) struct AccountStats {
pub memory: u64,
pub storage: u64,
pub streams: usize,
pub limits: AccountLimits,
}

#[derive(Debug, Default, Serialize, Deserialize, Clone)]
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub(crate) struct PubAck {
pub stream: String,
pub seq: u64,
pub duplicate: Option<bool>,
#[serde(default, skip_serializing_if = "is_default")]
pub duplicate: bool,
}

/// The kinds of response used for acknowledging a processed message.
Expand Down Expand Up @@ -437,7 +463,7 @@ impl AsRef<[u8]> for AckKind {
}

/// Information about a consumer
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ConsumerInfo {
/// The stream being consumed
pub stream_name: String,
Expand Down Expand Up @@ -465,14 +491,16 @@ pub struct ConsumerInfo {
}

/// Information about the consumer's associated `JetStream` cluster
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ClusterInfo {
/// The leader of the cluster
pub leader: String,
}

/// Information about a consumer and the stream it is consuming
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)]
#[derive(
Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq,
)]
pub struct SequencePair {
/// How far along the consumer has progressed
pub consumer_seq: u64,
Expand All @@ -481,23 +509,28 @@ pub struct SequencePair {
}

/// for getting next messages for pull based consumers.
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)]
#[derive(
Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq,
)]
pub struct NextRequest {
/// The number of messages that are being requested to be delivered.
pub batch: usize,
/// The optional number of nanoseconds that the server will store this next request for
/// before forgetting about the pending batch size.
pub expires: Option<usize>,
#[serde(default, skip_serializing_if = "is_default")]
pub expires: usize,
/// This optionally causes the server not to store this pending request at all, but when there are no
/// messages to deliver will send a nil bytes message with a Status header of 404, this way you
/// can know when you reached the end of the stream for example. A 409 is returned if the
/// Consumer has reached MaxAckPending limits.
pub no_wait: Option<bool>,
#[serde(default, skip_serializing_if = "is_default")]
pub no_wait: bool,
}

#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub(crate) struct StreamRequest {
pub subject: Option<String>,
#[serde(default, skip_serializing_if = "is_default")]
pub subject: String,
}

/// options for subscription
Expand Down
4 changes: 2 additions & 2 deletions tests/jetstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ fn jetstream_basics() -> io::Result<()> {

#[test]
fn jetstream_libdoc_test() {
use nats::jetstream::{AckPolicy, Consumer, ConsumerConfig};
use nats::jetstream::Consumer;

let server = server();

Expand Down Expand Up @@ -185,5 +185,5 @@ fn jetstream_libdoc_test() {
let results: Vec<std::io::Result<usize>> =
consumer.process_batch(batch_size, |msg| Ok(msg.data.len()));
let flipped: std::io::Result<Vec<usize>> = results.into_iter().collect();
let sizes: Vec<usize> = flipped.unwrap();
let _sizes: Vec<usize> = flipped.unwrap();
}

0 comments on commit 88bdfbd

Please sign in to comment.