Skip to content

Commit

Permalink
Merge branch 'main' into better-consumer-impl
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini committed Jul 29, 2023
2 parents 9a31cce + f20934d commit d0d57e8
Show file tree
Hide file tree
Showing 19 changed files with 893 additions and 1,064 deletions.
25 changes: 25 additions & 0 deletions async-nats/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,28 @@
# 0.31.0
This release focuses on improvements of heartbeats in JetStream Consumers.

Heartbeats are a tool that tells the user if the given consumer is healthy but does not get any messages or if the reason for no message is an actual problem.
However, if the user was not polling the `Stream` future for the next messages for a long time (because it was slowly processing messages), that could trigger idle heartbeats, as the library could not see the heartbeat messages without messages being polled.

This release fixes it by starting the idle heartbeat timer only after Stream future is polled (which usually means calling `messages.next().await`).

## What's Changed
* Fix unwrap from `HeaderName::from_str` call by @caspervonb in https://github.com/nats-io/nats.rs/pull/1032
* Use idiomatic method for writing `Option` and accessing inner `T` by @paolobarbolini in https://github.com/nats-io/nats.rs/pull/1034
* Add missing sequence number reset by @paolobarbolini in https://github.com/nats-io/nats.rs/pull/1035
* Fix header name range validation by @caspervonb in https://github.com/nats-io/nats.rs/pull/1031
* Simplify consumer checking logic by @paolobarbolini in https://github.com/nats-io/nats.rs/pull/1033
* Fix millis -> nanos typo in `BatchConfig` `expiration` by @paolobarbolini in https://github.com/nats-io/nats.rs/pull/1037
* Fix kv purge with prefix (thanks @brooksmtownsend for reporting it!) by @Jarema in https://github.com/nats-io/nats.rs/pull/1055
* Remove memcpy in object store PUT by @paolobarbolini in https://github.com/nats-io/nats.rs/pull/1039
* Drop subscription on list done by @Jarema in https://github.com/nats-io/nats.rs/pull/1041
* Improve push consumer handling when encountering slow consumers by @Jarema in https://github.com/nats-io/nats.rs/pull/1044
* Rework idle heartbeat for pull consumers by @Jarema in https://github.com/nats-io/nats.rs/pull/1046
* Rework push consumer heartbeats handling by @Jarema in https://github.com/nats-io/nats.rs/pull/1048


**Full Changelog**: https://github.com/nats-io/nats.rs/compare/async-nats/v0.30.0...async-nats/v0.30.1

# 0.30.0
## Overview
This is a big release that introduces almost all breaking changes and API refinements before 1.0.0.
Expand Down
6 changes: 3 additions & 3 deletions async-nats/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "async-nats"
authors = ["Tomasz Pietrek <tomasz@nats.io>", "Casper Beyer <caspervonb@pm.me>"]
version = "0.30.0"
version = "0.31.0"
edition = "2021"
rust = "1.64.0"
description = "A async Rust NATS client"
Expand Down Expand Up @@ -29,7 +29,7 @@ itoa = "1"
url = { version = "2"}
tokio-rustls = "0.24"
rustls-pemfile = "1.0.2"
nuid = "0.3.2"
nuid = "0.4.1"
serde_nanos = "0.1.3"
time = { version = "0.3.20", features = ["parsing", "formatting", "serde", "serde-well-known"] }
rustls-native-certs = "0.6"
Expand All @@ -42,7 +42,7 @@ rand = "0.8"
webpki = { package = "rustls-webpki", version = "0.101.1", features = ["alloc", "std"] }

[dev-dependencies]
criterion = { version = "0.3", features = ["async_tokio"]}
criterion = { version = "0.5", features = ["async_tokio"]}
nats-server = { path = "../nats-server" }
rand = "0.8"
tokio = { version = "1.25.0", features = ["rt-multi-thread"] }
Expand Down
6 changes: 3 additions & 3 deletions async-nats/src/auth_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
use nkeys::KeyPair;
use once_cell::sync::Lazy;
use regex::Regex;
use std::{io, path::PathBuf};
use std::{io, path::Path};

/// Loads user credentials file with jwt and key. Return file contents.
/// Uses tokio non-blocking io
pub(crate) async fn load_creds(path: PathBuf) -> io::Result<String> {
tokio::fs::read_to_string(&path).await.map_err(|err| {
pub(crate) async fn load_creds(path: &Path) -> io::Result<String> {
tokio::fs::read_to_string(path).await.map_err(|err| {
io::Error::new(
io::ErrorKind::Other,
format!("loading creds file '{}': {}", path.display(), err),
Expand Down
71 changes: 27 additions & 44 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::connection::State;
use crate::ServerInfo;

use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber};
use crate::error::Error;
use bytes::Bytes;
use futures::future::TryFutureExt;
use futures::stream::StreamExt;
Expand Down Expand Up @@ -609,7 +610,7 @@ impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
}
}

#[derive(Debug, PartialEq, Copy, Clone)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum RequestErrorKind {
/// There are services listening on requested subject, but they didn't respond
/// in time.
Expand All @@ -620,51 +621,33 @@ pub enum RequestErrorKind {
Other,
}

/// Error returned when a core NATS request fails.
/// To be enumerate over the variants, call [RequestError::kind].
#[derive(Debug)]
pub struct RequestError {
kind: RequestErrorKind,
source: Option<crate::Error>,
}

crate::error_impls!(RequestError, RequestErrorKind);

impl Display for RequestError {
impl Display for RequestErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.kind {
RequestErrorKind::TimedOut => write!(f, "request timed out"),
RequestErrorKind::NoResponders => write!(f, "no responders"),
RequestErrorKind::Other => write!(f, "request failed: {:?}", self.kind),
match self {
Self::TimedOut => write!(f, "request timed out"),
Self::NoResponders => write!(f, "no responders"),
Self::Other => write!(f, "request failed"),
}
}
}

/// Error returned when flushing the messages buffered on the client fails.
/// To be enumerate over the variants, call [FlushError::kind].
#[derive(Debug)]
pub struct FlushError {
kind: FlushErrorKind,
source: Option<crate::Error>,
}
/// Error returned when a core NATS request fails.
/// To be enumerate over the variants, call [RequestError::kind].
pub type RequestError = Error<RequestErrorKind>;

crate::error_impls!(FlushError, FlushErrorKind);
impl From<PublishError> for RequestError {
fn from(e: PublishError) -> Self {
RequestError::with_source(RequestErrorKind::Other, e)
}
}

impl Display for FlushError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let source_info = self
.source
.as_ref()
.map(|e| e.to_string())
.unwrap_or_else(|| "no details".into());
match self.kind {
FlushErrorKind::SendError => write!(f, "failed to send flush request: {}", source_info),
FlushErrorKind::FlushError => write!(f, "flush failed: {}", source_info),
}
impl From<SubscribeError> for RequestError {
fn from(e: SubscribeError) -> Self {
RequestError::with_source(RequestErrorKind::Other, e)
}
}

#[derive(Debug, PartialEq, Clone, Copy)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum FlushErrorKind {
/// Sending the flush failed client side.
SendError,
Expand All @@ -674,13 +657,13 @@ pub enum FlushErrorKind {
FlushError,
}

impl From<PublishError> for RequestError {
fn from(e: PublishError) -> Self {
RequestError::with_source(RequestErrorKind::Other, e)
}
}
impl From<SubscribeError> for RequestError {
fn from(e: SubscribeError) -> Self {
RequestError::with_source(RequestErrorKind::Other, e)
impl Display for FlushErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SendError => write!(f, "failed to send flush request"),
Self::FlushError => write!(f, "flush failed"),
}
}
}

pub type FlushError = Error<FlushErrorKind>;

0 comments on commit d0d57e8

Please sign in to comment.