Skip to content

Commit

Permalink
fix: remove delay from last request latency call (#3579)
Browse files Browse the repository at this point in the history
Description
---

- Allows the last_request_latency to be instantly queried even when a client is busy with another request.

Motivation and Context
---
Ref #3577

A delay could be experienced when calling get_last_request_latency because a single client runs on a single task and so can only handle one service request at a time.

How Has This Been Tested?
---
Added unit test
  • Loading branch information
stringhandler committed Nov 19, 2021
2 parents f6d2995 + 73ab6a3 commit c82a8ca
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 41 deletions.
4 changes: 0 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -159,7 +159,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
remote_num_kernels - local_num_kernels,
);

let latency = client.get_last_request_latency().await?;
let latency = client.get_last_request_latency();
debug!(
target: LOG_TARGET,
"Initiating kernel sync with peer `{}` (latency = {}ms)",
Expand Down Expand Up @@ -287,7 +287,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let end = remote_num_outputs;
let end_hash = to_header.hash();

let latency = client.get_last_request_latency().await?;
let latency = client.get_last_request_latency();
debug!(
target: LOG_TARGET,
"Initiating output sync with peer `{}` (latency = {}ms)",
Expand Down
Expand Up @@ -218,7 +218,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
mut conn: PeerConnection,
) -> Result<(), BlockHeaderSyncError> {
let mut client = conn.connect_rpc::<rpc::BaseNodeSyncRpcClient>().await?;
let latency = client.get_last_request_latency().await?;
let latency = client.get_last_request_latency();
debug!(
target: LOG_TARGET,
"Initiating header sync with peer `{}` (sync latency = {}ms)",
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/base_node_service/monitor.rs
Expand Up @@ -151,7 +151,7 @@ where
);

let timer = Instant::now();
let latency = match client.get_last_request_latency().await? {
let latency = match client.get_last_request_latency() {
Some(latency) => latency,
None => continue,
};
Expand Down
Expand Up @@ -268,7 +268,7 @@ where TBackend: WalletBackend + 'static
.connect_rpc_using_builder(BaseNodeSyncRpcClient::builder().with_deadline(Duration::from_secs(60)))
.await?;

let latency = client.get_last_request_latency().await?;
let latency = client.get_last_request_latency();
self.publish_event(UtxoScannerEvent::ConnectedToBaseNode(
peer.clone(),
latency.unwrap_or_default(),
Expand Down
6 changes: 0 additions & 6 deletions common/Cargo.toml
Expand Up @@ -27,12 +27,6 @@ sha2 = "0.9.5"
path-clean = "0.1.0"
tari_storage = { version = "^0.21", path = "../infrastructure/storage"}
tracing = "0.1.26"
tracing-opentelemetry = "0.15.0"
tracing-subscriber = "0.2.20"

# network tracing, rt-tokio for async batch export
opentelemetry = { version = "0.16", default-features = false, features = ["trace","rt-tokio"] }
opentelemetry-jaeger = { version="0.15", features=["rt-tokio"]}

anyhow = { version = "1.0", optional = true }
git2 = { version = "0.8", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions comms/rpc_macros/src/generator.rs
Expand Up @@ -208,8 +208,8 @@ impl RpcCodeGenerator {

#client_methods

pub async fn get_last_request_latency(&mut self) -> Result<Option<std::time::Duration>, #dep_mod::RpcError> {
self.inner.get_last_request_latency().await
pub fn get_last_request_latency(&mut self) -> Option<std::time::Duration> {
self.inner.get_last_request_latency()
}

pub async fn ping(&mut self) -> Result<std::time::Duration, #dep_mod::RpcError> {
Expand Down
40 changes: 20 additions & 20 deletions comms/src/protocol/rpc/client/mod.rs
Expand Up @@ -73,7 +73,7 @@ use std::{
use tari_shutdown::{Shutdown, ShutdownSignal};
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::{mpsc, oneshot, Mutex},
sync::{mpsc, oneshot, watch, Mutex},
time,
};
use tower::{Service, ServiceExt};
Expand Down Expand Up @@ -105,7 +105,8 @@ impl RpcClient {
let (request_tx, request_rx) = mpsc::channel(1);
let shutdown = Shutdown::new();
let shutdown_signal = shutdown.to_signal();
let connector = ClientConnector::new(request_tx, shutdown);
let (last_request_latency_tx, last_request_latency_rx) = watch::channel(None);
let connector = ClientConnector::new(request_tx, last_request_latency_rx, shutdown);
let (ready_tx, ready_rx) = oneshot::channel();
let tracing_id = tracing::Span::current().id();
task::spawn({
Expand All @@ -116,6 +117,7 @@ impl RpcClient {
config,
node_id,
request_rx,
last_request_latency_tx,
framed,
ready_tx,
protocol_name,
Expand Down Expand Up @@ -172,7 +174,7 @@ impl RpcClient {
}

/// Return the latency of the last request
pub fn get_last_request_latency(&mut self) -> impl Future<Output = Result<Option<Duration>, RpcError>> + '_ {
pub fn get_last_request_latency(&mut self) -> Option<Duration> {
self.connector.get_last_request_latency()
}

Expand Down Expand Up @@ -315,13 +317,19 @@ impl Default for RpcClientConfig {
#[derive(Clone)]
pub struct ClientConnector {
inner: mpsc::Sender<ClientRequest>,
last_request_latency_rx: watch::Receiver<Option<Duration>>,
shutdown: Arc<Mutex<Shutdown>>,
}

impl ClientConnector {
pub(self) fn new(sender: mpsc::Sender<ClientRequest>, shutdown: Shutdown) -> Self {
pub(self) fn new(
sender: mpsc::Sender<ClientRequest>,
last_request_latency_rx: watch::Receiver<Option<Duration>>,
shutdown: Shutdown,
) -> Self {
Self {
inner: sender,
last_request_latency_rx,
shutdown: Arc::new(Mutex::new(shutdown)),
}
}
Expand All @@ -331,14 +339,8 @@ impl ClientConnector {
lock.trigger();
}

pub async fn get_last_request_latency(&mut self) -> Result<Option<Duration>, RpcError> {
let (reply, reply_rx) = oneshot::channel();
self.inner
.send(ClientRequest::GetLastRequestLatency(reply))
.await
.map_err(|_| RpcError::ClientClosed)?;

reply_rx.await.map_err(|_| RpcError::RequestCancelled)
pub fn get_last_request_latency(&mut self) -> Option<Duration> {
*self.last_request_latency_rx.borrow()
}

pub async fn send_ping(&mut self) -> Result<Duration, RpcError> {
Expand Down Expand Up @@ -391,23 +393,25 @@ struct RpcClientWorker<TSubstream> {
config: RpcClientConfig,
node_id: NodeId,
request_rx: mpsc::Receiver<ClientRequest>,
last_request_latency_tx: watch::Sender<Option<Duration>>,
framed: CanonicalFraming<TSubstream>,
// Request ids are limited to u16::MAX because varint encoding is used over the wire and the magnitude of the value
// sent determines the byte size. A u16 will be more than enough for the purpose
next_request_id: u16,
ready_tx: Option<oneshot::Sender<Result<(), RpcError>>>,
last_request_latency: Option<Duration>,
protocol_id: ProtocolId,
shutdown_signal: ShutdownSignal,
}

impl<TSubstream> RpcClientWorker<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
{
#[allow(clippy::too_many_arguments)]
pub(self) fn new(
config: RpcClientConfig,
node_id: NodeId,
request_rx: mpsc::Receiver<ClientRequest>,
last_request_latency_tx: watch::Sender<Option<Duration>>,
framed: CanonicalFraming<TSubstream>,
ready_tx: oneshot::Sender<Result<(), RpcError>>,
protocol_id: ProtocolId,
Expand All @@ -420,7 +424,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
framed,
next_request_id: 0,
ready_tx: Some(ready_tx),
last_request_latency: None,
last_request_latency_tx,
protocol_id,
shutdown_signal,
}
Expand Down Expand Up @@ -454,7 +458,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
self.protocol_name(),
latency
);
self.last_request_latency = Some(latency);
let _ = self.last_request_latency_tx.send(Some(latency));
if let Some(r) = self.ready_tx.take() {
let _ = r.send(Ok(()));
}
Expand Down Expand Up @@ -514,9 +518,6 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
SendRequest { request, reply } => {
self.do_request_response(request, reply).await?;
},
GetLastRequestLatency(reply) => {
let _ = reply.send(self.last_request_latency);
},
SendPing(reply) => {
self.do_ping_pong(reply).await?;
},
Expand Down Expand Up @@ -647,7 +648,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
let resp = match self.read_response(request_id).await {
Ok(resp) => {
if let Some(t) = timer.take() {
self.last_request_latency = Some(t.elapsed());
let _ = self.last_request_latency_tx.send(Some(t.elapsed()));
}
event!(Level::TRACE, "Message received");
trace!(
Expand Down Expand Up @@ -821,7 +822,6 @@ pub enum ClientRequest {
request: BaseRequest<Bytes>,
reply: oneshot::Sender<mpsc::Receiver<Result<Response<Bytes>, RpcStatus>>>,
},
GetLastRequestLatency(oneshot::Sender<Option<Duration>>),
SendPing(oneshot::Sender<Result<Duration, RpcStatus>>),
}

Expand Down
34 changes: 33 additions & 1 deletion comms/src/protocol/rpc/client/tests.rs
Expand Up @@ -25,7 +25,7 @@ use crate::{
protocol::{
rpc::{
test::{
greeting_service::{GreetingClient, GreetingServer, GreetingService},
greeting_service::{GreetingClient, GreetingServer, GreetingService, SlowStreamRequest},
mock::create_mocked_rpc_context,
},
NamedProtocolService,
Expand All @@ -39,9 +39,11 @@ use crate::{
runtime::task,
test_utils::mocks::{new_peer_connection_mock_pair, PeerConnectionMockState},
};
use std::{env, time::Duration};
use tari_shutdown::Shutdown;
use tari_test_utils::{async_assert_eventually, unpack_enum};
use tokio::sync::mpsc;
use tokio_stream::StreamExt;

async fn setup(num_concurrent_sessions: usize) -> (PeerConnection, PeerConnectionMockState, Shutdown) {
let (conn1, conn1_state, conn2, conn2_state) = new_peer_connection_mock_pair().await;
Expand Down Expand Up @@ -171,3 +173,33 @@ mod lazy_pool {
unpack_enum!(RpcClientPoolError::PeerConnectionDropped { .. } = err);
}
}

mod last_request_latency {
use super::*;

#[runtime::test]
async fn it_returns_the_latency_until_the_first_response() {
let (mut conn, _, _shutdown) = setup(1).await;

let mut client = conn.connect_rpc::<GreetingClient>().await.unwrap();

let resp = client
.slow_stream(SlowStreamRequest {
num_items: 100,
item_size: 10,
delay_ms: 10,
})
.await
.unwrap();

resp.collect::<Vec<_>>().await.into_iter().for_each(|r| {
r.unwrap();
});

let latency = client.get_last_request_latency().unwrap();
// CI could be really slow, so to prevent flakiness exclude the assert
if env::var("CI").is_err() {
assert!(latency < Duration::from_millis(100));
}
}
}
4 changes: 2 additions & 2 deletions comms/src/protocol/rpc/test/greeting_service.rs
Expand Up @@ -447,8 +447,8 @@ impl GreetingClient {
self.inner.server_streaming(request, 8).await
}

pub async fn get_last_request_latency(&mut self) -> Result<Option<Duration>, RpcError> {
self.inner.get_last_request_latency().await
pub fn get_last_request_latency(&mut self) -> Option<Duration> {
self.inner.get_last_request_latency()
}

pub async fn ping(&mut self) -> Result<Duration, RpcError> {
Expand Down
2 changes: 1 addition & 1 deletion comms/src/protocol/rpc/test/smoke.rs
Expand Up @@ -135,7 +135,7 @@ async fn request_response_errors_and_streaming() {
.unwrap();

// Latency is available "for free" as part of the connect protocol
assert!(client.get_last_request_latency().await.unwrap().is_some());
assert!(client.get_last_request_latency().is_some());

let resp = client
.say_hello(SayHelloRequest {
Expand Down

0 comments on commit c82a8ca

Please sign in to comment.