From ba1ad2fc7257f668d139c61340febee0cbc74b85 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Wed, 15 Sep 2021 15:26:46 +0200 Subject: [PATCH 1/2] Cut 0.15.2 --- CHANGELOG.md | 14 ++++++++++++++ Cargo.toml | 2 +- src/asynk.rs | 23 ++++++++--------------- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bbc1d83ad..72b91c643 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,17 @@ +# 0.15.2 + +# Bug Fixes + +- #217 Fix thread leak when `asynk::Subscription` + is dropped without explicitly `unsubscribe` first. + +# 0.15.1 + +# Improvements + +- Minor debug statement removed that was present in + previous release. + # 0.15.0 ## Breaking Changes diff --git a/Cargo.toml b/Cargo.toml index f5472d829..2ac45b3f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "nats" -version = "0.15.1" +version = "0.15.2" description = "A Rust NATS client" authors = ["Derek Collison ", "Tyler Neely ", "Stjepan Glavina "] edition = "2018" diff --git a/src/asynk.rs b/src/asynk.rs index 5d9d85373..f82a964aa 100644 --- a/src/asynk.rs +++ b/src/asynk.rs @@ -201,10 +201,10 @@ impl Connection { let msg = msg.as_ref().to_vec(); let inner = self.inner.clone(); let sub = unblock(move || inner.request_multi(&subject, msg)).await?; - let (closer_tx, closer_rx) = crossbeam_channel::bounded(0); + let (_closer_tx, closer_rx) = crossbeam_channel::bounded(0); Ok(Subscription { inner: sub, - closer_tx, + _closer_tx, closer_rx, }) } @@ -214,10 +214,10 @@ impl Connection { let subject = subject.to_string(); let inner = self.inner.clone(); let inner = unblock(move || inner.subscribe(&subject)).await?; - let (closer_tx, closer_rx) = crossbeam_channel::bounded(0); + let (_closer_tx, closer_rx) = crossbeam_channel::bounded(0); Ok(Subscription { inner, - closer_tx, + _closer_tx, closer_rx, }) } @@ -233,10 +233,10 @@ impl Connection { let inner = self.inner.clone(); let inner = unblock(move || inner.queue_subscribe(&subject, &queue)).await?; - let (closer_tx, closer_rx) = crossbeam_channel::bounded(0); + let (_closer_tx, closer_rx) = crossbeam_channel::bounded(0); Ok(Subscription { inner, - closer_tx, + _closer_tx, closer_rx, }) } @@ -319,11 +319,6 @@ impl Connection { } /// A subscription to a subject. -/// -/// Due to async limitations (lack of `AsyncDrop` etc...), -/// please call `Subscription::unsubscribe().await` manually -/// before dropping `Subscription` to avoid blocking the -/// runtime. #[derive(Debug)] pub struct Subscription { inner: crate::Subscription, @@ -331,7 +326,7 @@ pub struct Subscription { // Dropping this signals to any receivers that the subscription has been closed. These should // be dropped after inner is dropped, so if another thread is currently blocking, the // subscription is closed on that thread. - closer_tx: Sender<()>, + _closer_tx: Sender<()>, closer_rx: Receiver<()>, } @@ -382,9 +377,7 @@ impl Subscription { } /// Stops listening for new messages and discards the remaining queued - /// messages. This should always be called before dropping - /// `nats::asynk::Subscription` to avoid blocking the non-async `Drop` - /// implementation. + /// messages. pub async fn unsubscribe(&self) -> io::Result<()> { let inner = self.inner.clone(); unblock(move || inner.unsubscribe()).await From 073282478c0234b5564e1530dc4b79d19067a755 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Wed, 15 Sep 2021 17:09:12 +0200 Subject: [PATCH 2/2] Clippy feedback --- src/lib.rs | 1 + src/message.rs | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 208adf8c0..5e8dea602 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -243,6 +243,7 @@ const DEFAULT_FLUSH_TIMEOUT: Duration = Duration::from_secs(10); /// Information sent by the server back to this client /// during initial connection, and possibly again later. +#[allow(unused)] #[derive(Debug, Default, Clone)] struct ServerInfo { /// The unique identifier of the NATS server. diff --git a/src/message.rs b/src/message.rs index 34ae29abd..13d09391b 100644 --- a/src/message.rs +++ b/src/message.rs @@ -155,7 +155,7 @@ impl Message { pub fn jetstream_message_info( &self, ) -> Option> { - const PREFIX: &'static str = "$JS.ACK."; + const PREFIX: &str = "$JS.ACK."; const SKIP: usize = PREFIX.len(); let mut reply: &str = self.reply.as_ref()?; @@ -173,9 +173,9 @@ impl Message { // parsing this. let mut tokens: [Option<&str>; 10] = [None; 10]; let mut n_tokens = 0; - for index in 0..10 { + for each_token in &mut tokens { if let Some(token) = split.next() { - tokens[index] = Some(token); + *each_token = Some(token); n_tokens += 1; } }