Skip to content

Commit

Permalink
Merge pull request #220 from nats-io/tyler_0.15.2
Browse files Browse the repository at this point in the history
Cut 0.15.2. Closes #210.
  • Loading branch information
spacejam committed Sep 15, 2021
2 parents 5ec48a4 + 0732824 commit 161cbe8
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 19 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# 0.15.2

# Bug Fixes

- #217 Fix thread leak when `asynk::Subscription`
is dropped without explicitly `unsubscribe` first.

# 0.15.1

# Improvements

- Minor debug statement removed that was present in
previous release.

# 0.15.0

## Breaking Changes
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "nats"
version = "0.15.1"
version = "0.15.2"
description = "A Rust NATS client"
authors = ["Derek Collison <derek@nats.io>", "Tyler Neely <tyler@nats.io>", "Stjepan Glavina <stjepan@nats.io>"]
edition = "2018"
Expand Down
23 changes: 8 additions & 15 deletions src/asynk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,10 @@ impl Connection {
let msg = msg.as_ref().to_vec();
let inner = self.inner.clone();
let sub = unblock(move || inner.request_multi(&subject, msg)).await?;
let (closer_tx, closer_rx) = crossbeam_channel::bounded(0);
let (_closer_tx, closer_rx) = crossbeam_channel::bounded(0);
Ok(Subscription {
inner: sub,
closer_tx,
_closer_tx,
closer_rx,
})
}
Expand All @@ -214,10 +214,10 @@ impl Connection {
let subject = subject.to_string();
let inner = self.inner.clone();
let inner = unblock(move || inner.subscribe(&subject)).await?;
let (closer_tx, closer_rx) = crossbeam_channel::bounded(0);
let (_closer_tx, closer_rx) = crossbeam_channel::bounded(0);
Ok(Subscription {
inner,
closer_tx,
_closer_tx,
closer_rx,
})
}
Expand All @@ -233,10 +233,10 @@ impl Connection {
let inner = self.inner.clone();
let inner =
unblock(move || inner.queue_subscribe(&subject, &queue)).await?;
let (closer_tx, closer_rx) = crossbeam_channel::bounded(0);
let (_closer_tx, closer_rx) = crossbeam_channel::bounded(0);
Ok(Subscription {
inner,
closer_tx,
_closer_tx,
closer_rx,
})
}
Expand Down Expand Up @@ -319,19 +319,14 @@ impl Connection {
}

/// A subscription to a subject.
///
/// Due to async limitations (lack of `AsyncDrop` etc...),
/// please call `Subscription::unsubscribe().await` manually
/// before dropping `Subscription` to avoid blocking the
/// runtime.
#[derive(Debug)]
pub struct Subscription {
inner: crate::Subscription,

// Dropping this signals to any receivers that the subscription has been closed. These should
// be dropped after inner is dropped, so if another thread is currently blocking, the
// subscription is closed on that thread.
closer_tx: Sender<()>,
_closer_tx: Sender<()>,
closer_rx: Receiver<()>,
}

Expand Down Expand Up @@ -382,9 +377,7 @@ impl Subscription {
}

/// Stops listening for new messages and discards the remaining queued
/// messages. This should always be called before dropping
/// `nats::asynk::Subscription` to avoid blocking the non-async `Drop`
/// implementation.
/// messages.
pub async fn unsubscribe(&self) -> io::Result<()> {
let inner = self.inner.clone();
unblock(move || inner.unsubscribe()).await
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ const DEFAULT_FLUSH_TIMEOUT: Duration = Duration::from_secs(10);

/// Information sent by the server back to this client
/// during initial connection, and possibly again later.
#[allow(unused)]
#[derive(Debug, Default, Clone)]
struct ServerInfo {
/// The unique identifier of the NATS server.
Expand Down
6 changes: 3 additions & 3 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl Message {
pub fn jetstream_message_info(
&self,
) -> Option<crate::jetstream::JetStreamMessageInfo<'_>> {
const PREFIX: &'static str = "$JS.ACK.";
const PREFIX: &str = "$JS.ACK.";
const SKIP: usize = PREFIX.len();

let mut reply: &str = self.reply.as_ref()?;
Expand All @@ -173,9 +173,9 @@ impl Message {
// parsing this.
let mut tokens: [Option<&str>; 10] = [None; 10];
let mut n_tokens = 0;
for index in 0..10 {
for each_token in &mut tokens {
if let Some(token) = split.next() {
tokens[index] = Some(token);
*each_token = Some(token);
n_tokens += 1;
}
}
Expand Down

0 comments on commit 161cbe8

Please sign in to comment.