Skip to content

Commit

Permalink
Make dropped_subscriptions to atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarema committed May 19, 2022
1 parent 34f7271 commit f6835a1
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 35 deletions.
55 changes: 21 additions & 34 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ pub enum Command {
subject: String,
queue_group: Option<String>,
sender: mpsc::Sender<Message>,
dropped_messages: Arc<AtomicU64>,
},
Unsubscribe {
sid: u64,
Expand All @@ -250,10 +251,6 @@ pub enum Command {
},
TryFlush,
Connect(ConnectInfo),
GetDroppedMessages {
response: tokio::sync::oneshot::Sender<u64>,
sid: u64,
},
}

/// `ClientOp` represents all actions of `Client`.
Expand Down Expand Up @@ -422,7 +419,7 @@ struct Subscription {
sender: mpsc::Sender<Message>,
queue_group: Option<String>,
delivered: u64,
dropped: u64,
dropped_messages: Arc<AtomicU64>,
max: Option<u64>,
}

Expand Down Expand Up @@ -536,7 +533,9 @@ impl ConnectionHandler {
}
}
Err(mpsc::error::TrySendError::Full(_)) => {
subscription.dropped += 1;
subscription
.dropped_messages
.fetch_add(1, Ordering::Relaxed);
self.events.send(ServerEvent::SlowConsumer(sid)).await.ok();
}
Err(mpsc::error::TrySendError::Closed(_)) => {
Expand Down Expand Up @@ -625,11 +624,12 @@ impl ConnectionHandler {
subject,
queue_group,
sender,
dropped_messages,
} => {
let subscription = Subscription {
sender,
delivered: 0,
dropped: 0,
dropped_messages,
max: None,
subject: subject.to_owned(),
queue_group: queue_group.to_owned(),
Expand Down Expand Up @@ -678,21 +678,6 @@ impl ConnectionHandler {
self.handle_disconnect().await?;
}
}
Command::GetDroppedMessages { response, sid } => {
response
.send(
self.subscriptions
.get(&sid)
.ok_or_else(|| {
std::io::Error::new(
ErrorKind::NotFound,
format!("could not find subscription {}", sid),
)
})?
.dropped,
)
.ok();
}
}

Ok(())
Expand Down Expand Up @@ -877,6 +862,8 @@ impl Client {
subject: String,
queue_group: Option<String>,
) -> Result<Subscriber, io::Error> {
let dropped_messages = Arc::new(AtomicU64::new(0));
// dropped_messages.fetch_add
let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
let (sender, receiver) = mpsc::channel(self.subscription_capacity);

Expand All @@ -886,11 +873,17 @@ impl Client {
subject,
queue_group,
sender,
dropped_messages: dropped_messages.clone(),
})
.await
.unwrap();

Ok(Subscriber::new(sid, self.sender.clone(), receiver))
Ok(Subscriber::new(
sid,
self.sender.clone(),
receiver,
dropped_messages,
))
}

pub async fn flush(&self) -> Result<(), Error> {
Expand Down Expand Up @@ -1093,18 +1086,21 @@ pub struct Subscriber {
sid: u64,
receiver: mpsc::Receiver<Message>,
sender: mpsc::Sender<Command>,
dropped_messages: Arc<AtomicU64>,
}

impl Subscriber {
fn new(
sid: u64,
sender: mpsc::Sender<Command>,
receiver: mpsc::Receiver<Message>,
dropped_messages: Arc<AtomicU64>,
) -> Subscriber {
Subscriber {
sid,
sender,
receiver,
dropped_messages,
}
}

Expand Down Expand Up @@ -1183,17 +1179,8 @@ impl Subscriber {
///
/// # Ok(())
/// # }
pub async fn dropped(&mut self) -> io::Result<u64> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.sender
.send(Command::GetDroppedMessages {
response: tx,
sid: self.sid,
})
.await
.map_err(|err| io::Error::new(ErrorKind::Other, err))?;
rx.await
.map_err(|err| io::Error::new(ErrorKind::Other, err))
pub async fn dropped(&mut self) -> u64 {
self.dropped_messages.load(Ordering::Relaxed)
}
}

Expand Down
2 changes: 1 addition & 1 deletion async-nats/tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ mod client {
.unwrap();
client.flush().await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
let dropped = sub.dropped().await.unwrap();
let dropped = sub.dropped().await;
println!("dropped: {}", dropped);
assert_eq!(dropped, 1);

Expand Down

0 comments on commit f6835a1

Please sign in to comment.