Skip to content

Commit

Permalink
Add rtt to Client
Browse files Browse the repository at this point in the history
  • Loading branch information
n1ghtmare committed Apr 7, 2023
1 parent 87d7f04 commit 42a515e
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 4 deletions.
92 changes: 91 additions & 1 deletion async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use regex::Regex;
use std::fmt::Display;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use thiserror::Error;
use tokio::sync::mpsc;
use tracing::trace;
Expand Down Expand Up @@ -463,6 +463,51 @@ impl Client {
Ok(())
}

/// Calculates the round trip time between this client and the server,
/// if the server is currently connected.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let rtt = client.rtt().await?;
/// println!("server rtt: {:?}", rtt);
/// # Ok(())
/// # }
/// ```
pub async fn rtt(&self) -> Result<Duration, RttError> {
let start = Instant::now();

let (ping_tx, ping_rx) = tokio::sync::oneshot::channel();
let (pong_tx, pong_rx) = tokio::sync::oneshot::channel();

self.sender
.send(Command::Ping {
ping_result: Some(ping_tx),
pong_result: Some(pong_tx),
})
.await
.map_err(|err| RttError::with_source(RttErrorKind::PingError, err))?;

ping_rx
.await
// first handle rx error
.map_err(|err| RttError::with_source(RttErrorKind::PingError, err))?
// second handle the atual ping error
.map_err(|err| RttError::with_source(RttErrorKind::PingError, err))?;

pong_rx
.await
// first handle rx error
.map_err(|err| RttError::with_source(RttErrorKind::PongError, err))?
// second handle the actual pong error
.map_err(|err| RttError::with_source(RttErrorKind::PongError, err))?;

Ok(start.elapsed())
}

/// Returns the current state of the connection.
///
/// # Examples
Expand Down Expand Up @@ -688,3 +733,48 @@ impl From<SubscribeError> for RequestError {
RequestError::with_source(RequestErrorKind::Other, e)
}
}

/// Error returned when doing a round-trip time measurement fails.
/// To enumerate over the variants, call [RttError::kind].
#[derive(Debug, Error)]
pub struct RttError {
kind: RttErrorKind,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
}

impl Display for RttError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let source_info = self
.source
.as_ref()
.map(|e| e.to_string())
.unwrap_or_else(|| "no details".into());
match self.kind {
RttErrorKind::PingError => {
write!(f, "failed to ping server: {}", source_info)
}
RttErrorKind::PongError => write!(f, "pong failed: {}", source_info),
}
}
}

impl RttError {
fn with_source<E>(kind: RttErrorKind, source: E) -> RttError
where
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
RttError {
kind,
source: Some(source.into()),
}
}
pub fn kind(&self) -> RttErrorKind {
self.kind
}
}

#[derive(Debug, PartialEq, Clone, Copy)]
pub enum RttErrorKind {
PingError,
PongError,
}
35 changes: 32 additions & 3 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,10 @@ pub enum Command {
sid: u64,
max: Option<u64>,
},
Ping,
Ping {
ping_result: Option<oneshot::Sender<Result<(), io::Error>>>,
pong_result: Option<oneshot::Sender<Result<(), io::Error>>>,
},
Flush {
result: oneshot::Sender<Result<(), io::Error>>,
},
Expand Down Expand Up @@ -305,6 +308,7 @@ pub(crate) struct ConnectionHandler {
info_sender: tokio::sync::watch::Sender<ServerInfo>,
ping_interval: Interval,
flush_interval: Interval,
pending_pongs: Vec<oneshot::Sender<Result<(), io::Error>>>,
}

impl ConnectionHandler {
Expand All @@ -330,6 +334,7 @@ impl ConnectionHandler {
info_sender,
ping_interval,
flush_interval,
pending_pongs: Vec::new(),
}
}

Expand Down Expand Up @@ -398,6 +403,12 @@ impl ConnectionHandler {
ServerOp::Pong => {
debug!("received PONG");
self.pending_pings = self.pending_pings.saturating_sub(1);

while let Some(sender) = self.pending_pongs.pop() {
sender.send(Ok(())).map_err(|_| {
io::Error::new(io::ErrorKind::Other, "one shot failed to be received")
})?;
}
}
ServerOp::Error(error) => {
self.connector
Expand Down Expand Up @@ -508,7 +519,10 @@ impl ConnectionHandler {
}
}
}
Command::Ping => {
Command::Ping {
ping_result,
pong_result,
} => {
debug!(
"PING command. Pending pings {}, max pings {}",
self.pending_pings, self.max_pings
Expand All @@ -524,8 +538,23 @@ impl ConnectionHandler {
self.handle_disconnect().await?;
}

if let Err(_err) = self.connection.write_op(&ClientOp::Ping).await {
if let Err(err) = self.connection.write_op(&ClientOp::Ping).await {
self.handle_disconnect().await?;

if let Some(ping_result) = ping_result {
ping_result.send(Err(err)).map_err(|_| {
io::Error::new(io::ErrorKind::Other, "one shot failed to be received")
})?;
}
} else if let Some(ping_result) = ping_result {
if let Some(pong_result) = pong_result {
// Use this channel to return back a PONG
self.pending_pongs.push(pong_result);
}

ping_result.send(Ok(())).map_err(|_| {
io::Error::new(io::ErrorKind::Other, "one shot failed to be received")
})?;
}

self.handle_flush().await?;
Expand Down
11 changes: 11 additions & 0 deletions async-nats/tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,4 +764,15 @@ mod client {
drop(servers.remove(0));
rx.recv().await;
}

#[tokio::test]
async fn rtt() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();

let rtt = client.rtt().await.unwrap();

println!("rtt: {:?}", rtt);
assert!(rtt.as_nanos() > 0);
}
}

0 comments on commit 42a515e

Please sign in to comment.