Skip to content

Commit

Permalink
feat: add more intrumentation to tari_comms
Browse files Browse the repository at this point in the history
+ improve logs around stream interruptions
+ be more permissive of responses for the incorrect request_id
  • Loading branch information
sdbondi committed Nov 18, 2021
1 parent 337bc6f commit 6df9aae
Show file tree
Hide file tree
Showing 34 changed files with 475 additions and 139 deletions.
6 changes: 5 additions & 1 deletion applications/daily_tests/washing_machine.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,18 @@ function WashingMachine(options) {
`🚀 Launching washing machine (numTransactions = ${numTransactions}, numRounds = ${numRounds}, sleep = ${sleepAfterRound}s)`
);

debug(`Connecting to wallet1 at ${wallet1Grpc}...`);
await this.wallet1.connect(wallet1Grpc);
debug(`Connected.`);

debug("Compiling and starting applications...");
let wallet2Process = null;
// Start wallet2
if (wallet2Grpc) {
this.wallet2 = new WalletClient();
debug(`Connecting to wallet2 at ${wallet1Grpc}...`);
await this.wallet2.connect(wallet2Grpc);
} else {
debug("Compiling wallet2...");
const port = await getFreePort(20000, 25000);
wallet2Process = createGrpcWallet(
baseNodeSeed,
Expand All @@ -148,6 +151,7 @@ function WashingMachine(options) {
true
);
wallet2Process.baseDir = "./wallet";
debug("Starting wallet2...");
await wallet2Process.startNew();
this.wallet2 = await wallet2Process.connectClient();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,16 +288,14 @@ fn parse_make_it_rain(mut args: SplitWhitespace) -> Result<Vec<ParsedArgument>,
parsed_args.push(ParsedArgument::PublicKey(pubkey));

// transaction type
let txn_type = args
.next()
.ok_or_else(|| ParseError::Empty("transaction type".to_string()))?;
let txn_type = args.next();
let negotiated = match txn_type {
"negotiated" => true,
"one_sided" => false,
Some("negotiated") | Some("interactive") => true,
Some("one_sided") | Some("one-sided") | Some("onesided") => false,
_ => {
println!("Invalid data provided for <transaction type>, must be 'negotiated' or 'one_sided'\n");
println!("Invalid data provided for <transaction type>, must be 'interactive' or 'one-sided'\n");
return Err(ParseError::Invalid(
"Invalid data provided for <transaction type>, must be 'negotiated' or 'one_sided'".to_string(),
"Invalid data provided for <transaction type>, must be 'interactive' or 'one-sided'".to_string(),
));
},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {

let resp = match client.find_chain_split(request).await {
Ok(r) => r,
Err(RpcError::RequestFailed(err)) if err.status_code().is_not_found() => {
Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => {
// This round we sent less hashes than the max, so the next round will not have any more hashes to
// send. Exit early in this case.
if block_hashes.len() < NUM_CHAIN_SPLIT_HEADERS {
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/sync/rpc/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ mod sync_blocks {
};
let req = rpc_request_mock.request_with_context(Default::default(), msg);
let err = service.sync_blocks(req).await.unwrap_err();
unpack_enum!(RpcStatusCode::NotFound = err.status_code());
unpack_enum!(RpcStatusCode::NotFound = err.as_status_code());
}

#[tokio::test]
Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/src/mempool/rpc/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ mod get_tx_state_by_excess_sig {
.await
.unwrap_err();

unpack_enum!(RpcStatusCode::BadRequest = status.status_code());
unpack_enum!(RpcStatusCode::BadRequest = status.as_status_code());
}
}

Expand Down Expand Up @@ -174,6 +174,6 @@ mod submit_transaction {
.await
.unwrap_err();

unpack_enum!(RpcStatusCode::BadRequest = status.status_code());
unpack_enum!(RpcStatusCode::BadRequest = status.as_status_code());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
use log::*;
use std::{collections::HashMap, convert::TryInto, sync::Arc};
use tari_common_types::types::BlockHash;
use tari_comms::protocol::rpc::{RpcError::RequestFailed, RpcStatusCode::NotFound};
use tari_comms::protocol::rpc::RpcError::RequestFailed;
use tari_core::{
base_node::rpc::BaseNodeWalletRpcClient,
blocks::BlockHeader,
Expand Down Expand Up @@ -353,7 +353,7 @@ where
info!(target: LOG_TARGET, "Error asking base node for header:{}", rpc_error);
match &rpc_error {
RequestFailed(status) => {
if status.status_code() == NotFound {
if status.as_status_code().is_not_found() {
return Ok(None);
} else {
return Err(rpc_error.into());
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/transaction_service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl Default for TransactionServiceConfig {
direct_send_timeout: Duration::from_secs(20),
broadcast_send_timeout: Duration::from_secs(60),
low_power_polling_timeout: Duration::from_secs(300),
transaction_resend_period: Duration::from_secs(3600),
transaction_resend_period: Duration::from_secs(600),
resend_response_cooldown: Duration::from_secs(300),
pending_transaction_cancellation_timeout: Duration::from_secs(259200), // 3 Days
num_confirmations_required: 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ where
warn!(target: LOG_TARGET, "Error asking base node for header:{}", rpc_error);
match &rpc_error {
RequestFailed(status) => {
if status.status_code() == NotFound {
if status.as_status_code() == NotFound {
return Ok(None);
} else {
return Err(rpc_error.into());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ where TBackend: WalletBackend + 'static
// this returns the index of the vec of hashes we sent it, that is the last hash it knows of.
match client.find_chain_split(request).await {
Ok(_) => Ok(metadata.utxo_index + 1),
Err(RpcError::RequestFailed(err)) if err.status_code().is_not_found() => {
Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => {
warn!(target: LOG_TARGET, "Reorg detected: {}", err);
// The node does not know of the last hash we scanned, thus we had a chain split.
// We now start at 0 again.
Expand Down
2 changes: 1 addition & 1 deletion comms/dht/examples/memory_net/utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ pub async fn network_connectivity_stats(nodes: &[TestNode], wallets: &[TestNode]
total += t;
avg += a;
println!(
"{} total connections on the network. ({} per node on average)",
"{} total connections on the network. ({} per peer on average)",
total,
avg / (wallets.len() + nodes.len())
);
Expand Down
2 changes: 1 addition & 1 deletion comms/dht/src/rpc/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ mod get_closer_peers {
let node_id = NodeId::default();
let req = mock.request_with_context(node_id, req);
let err = service.get_closer_peers(req).await.unwrap_err();
assert_eq!(err.status_code(), RpcStatusCode::BadRequest);
assert_eq!(err.as_status_code(), RpcStatusCode::BadRequest);
}
}

Expand Down
2 changes: 1 addition & 1 deletion comms/rpc_macros/tests/macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async fn it_returns_an_error_for_invalid_method_nums() {
.await
.unwrap_err();

unpack_enum!(RpcStatusCode::UnsupportedMethod = err.status_code());
unpack_enum!(RpcStatusCode::UnsupportedMethod = err.as_status_code());
}

#[tokio::test]
Expand Down
4 changes: 4 additions & 0 deletions comms/src/connection_manager/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
common,
dial_state::DialState,
manager::{ConnectionManagerConfig, ConnectionManagerEvent},
metrics,
peer_connection,
},
multiaddr::Multiaddr,
Expand Down Expand Up @@ -193,6 +194,7 @@ where
dial_result: Result<PeerConnection, ConnectionManagerError>,
) {
let node_id = dial_state.peer().node_id.clone();
metrics::pending_connections(Some(&node_id), ConnectionDirection::Outbound).inc();

let removed = self.cancel_signals.remove(&node_id);
drop(removed);
Expand All @@ -213,6 +215,8 @@ where
},
}

metrics::pending_connections(Some(&node_id), ConnectionDirection::Outbound).dec();

if self.pending_dial_requests.contains_key(&node_id) {
self.reply_to_pending_requests(&node_id, dial_result.clone());
}
Expand Down
4 changes: 4 additions & 0 deletions comms/src/connection_manager/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::{
bounded_executor::BoundedExecutor,
connection_manager::{
liveness::LivenessSession,
metrics,
wire_mode::{WireMode, LIVENESS_WIRE_MODE},
},
multiaddr::Multiaddr,
Expand Down Expand Up @@ -239,6 +240,7 @@ where

let span = span!(Level::TRACE, "connection_mann::listener::inbound_task",);
let inbound_fut = async move {
metrics::pending_connections(None, ConnectionDirection::Inbound).inc();
match Self::read_wire_format(&mut socket, config.time_to_first_byte).await {
Ok(WireMode::Comms(byte)) if byte == config.network_info.network_byte => {
let this_node_id_str = node_identity.node_id().short_str();
Expand Down Expand Up @@ -325,6 +327,8 @@ where
);
},
}

metrics::pending_connections(None, ConnectionDirection::Inbound).dec();
}
.instrument(span);

Expand Down
20 changes: 17 additions & 3 deletions comms/src/connection_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use super::{
};
use crate::{
backoff::Backoff,
connection_manager::{metrics, ConnectionDirection},
multiplexing::Substream,
noise::NoiseConfig,
peer_manager::{NodeId, NodeIdentity},
Expand Down Expand Up @@ -397,10 +398,14 @@ where
node_id.short_str(),
proto_str
);
metrics::inbound_substream_counter(&node_id, &protocol).inc();
let notify_fut = self
.protocols
.notify(&protocol, ProtocolEvent::NewInboundSubstream(node_id, stream));
match time::timeout(Duration::from_secs(10), notify_fut).await {
Ok(Ok(_)) => {
debug!(target: LOG_TARGET, "Protocol notification for '{}' sent", proto_str);
},
Ok(Err(err)) => {
error!(
target: LOG_TARGET,
Expand All @@ -413,12 +418,21 @@ where
"Error sending NewSubstream notification for protocol '{}' because {}", proto_str, err
);
},
_ => {
debug!(target: LOG_TARGET, "Protocol notification for '{}' sent", proto_str);
},
}
},

PeerConnected(conn) => {
metrics::successful_connections(conn.peer_node_id(), conn.direction()).inc();
self.publish_event(PeerConnected(conn));
},
PeerConnectFailed(peer, err) => {
metrics::failed_connections(&peer, ConnectionDirection::Outbound).inc();
self.publish_event(PeerConnectFailed(peer, err));
},
PeerInboundConnectFailed(err) => {
metrics::failed_connections(&Default::default(), ConnectionDirection::Inbound).inc();
self.publish_event(PeerInboundConnectFailed(err));
},
event => {
self.publish_event(event);
},
Expand Down
82 changes: 82 additions & 0 deletions comms/src/connection_manager/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2021, The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::{connection_manager::ConnectionDirection, peer_manager::NodeId, protocol::ProtocolId};
use once_cell::sync::Lazy;
use tari_metrics::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec};

pub fn pending_connections(peer: Option<&NodeId>, direction: ConnectionDirection) -> IntGauge {
static METER: Lazy<IntGaugeVec> = Lazy::new(|| {
tari_metrics::register_int_gauge_vec(
"comms::connections::pending",
"Number of active connections by direction",
&["peer_id", "direction"],
)
.unwrap()
});

METER.with_label_values(&[
peer.map(ToString::to_string)
.unwrap_or_else(|| "unknown".to_string())
.as_str(),
direction.as_str(),
])
}

pub fn successful_connections(peer: &NodeId, direction: ConnectionDirection) -> IntCounter {
static METER: Lazy<IntCounterVec> = Lazy::new(|| {
tari_metrics::register_int_counter_vec(
"comms::connections::success",
"Number of active connections by direction",
&["peer_id", "direction"],
)
.unwrap()
});

METER.with_label_values(&[peer.to_string().as_str(), direction.as_str()])
}

pub fn failed_connections(peer: &NodeId, direction: ConnectionDirection) -> IntCounter {
static METER: Lazy<IntCounterVec> = Lazy::new(|| {
tari_metrics::register_int_counter_vec(
"comms::connections::failed",
"Number of active connections by direction",
&["peer_id", "direction"],
)
.unwrap()
});

METER.with_label_values(&[peer.to_string().as_str(), direction.as_str()])
}

pub fn inbound_substream_counter(peer: &NodeId, protocol: &ProtocolId) -> IntCounter {
static METER: Lazy<IntCounterVec> = Lazy::new(|| {
tari_metrics::register_int_counter_vec(
"comms::connections::inbound_substream_request_count",
"Number of substream requests",
&["peer_id", "protocol"],
)
.unwrap()
});

METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}
1 change: 1 addition & 0 deletions comms/src/connection_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
mod dial_state;
mod dialer;
mod listener;
mod metrics;

mod common;
pub use common::validate_peer_addresses;
Expand Down
5 changes: 1 addition & 4 deletions comms/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,10 +510,7 @@ impl ConnectivityManagerActor {
_ => {},
}
},
#[cfg(feature = "metrics")]
NewInboundSubstream(node_id, protocol, _) => {
super::metrics::substream_request_count(node_id, protocol).inc();
},

_ => {},
}

Expand Down
21 changes: 6 additions & 15 deletions comms/src/connectivity/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,23 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::{connection_manager::ConnectionDirection, peer_manager::NodeId, protocol::ProtocolId};
use crate::connection_manager::ConnectionDirection;
use once_cell::sync::Lazy;
use tari_metrics::{IntGauge, IntGaugeVec};

pub fn connections(direction: ConnectionDirection) -> IntGauge {
static METER: Lazy<IntGaugeVec> = Lazy::new(|| {
tari_metrics::register_int_gauge_vec("comms::connections", "Number of active connections by direction", &[
"direction",
])
tari_metrics::register_int_gauge_vec(
"comms::connectivity::num_connections",
"Number of active connections by direction",
&["direction"],
)
.unwrap()
});

METER.with_label_values(&[direction.as_str()])
}

pub fn substream_request_count(peer: &NodeId, protocol: &ProtocolId) -> IntGauge {
static METER: Lazy<IntGaugeVec> = Lazy::new(|| {
tari_metrics::register_int_gauge_vec("comms::substream_request_count", "Number of substream requests", &[
"peer", "protocol",
])
.unwrap()
});

METER.with_label_values(&[peer.to_string().as_str(), String::from_utf8_lossy(protocol).as_ref()])
}

pub fn uptime() -> IntGauge {
static METER: Lazy<IntGauge> =
Lazy::new(|| tari_metrics::register_int_gauge("comms::uptime", "Comms uptime").unwrap());
Expand Down

0 comments on commit 6df9aae

Please sign in to comment.