Skip to content

Commit

Permalink
even more metrics to compile out
Browse files Browse the repository at this point in the history
  • Loading branch information
brianp committed Nov 10, 2023
1 parent e494f69 commit 1fa74d9
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 28 deletions.
62 changes: 42 additions & 20 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#[cfg(feature = "metrics")]
use std::convert::{TryFrom, TryInto};
use std::{
cmp::max,
collections::HashSet,
convert::{TryFrom, TryInto},
sync::Arc,
time::{Duration, Instant},
};
Expand All @@ -35,17 +36,16 @@ use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId};
use tari_utilities::hex::Hex;
use tokio::sync::RwLock;

#[cfg(feature = "metrics")]
use crate::base_node::metrics;
use crate::{
base_node::{
comms_interface::{
error::CommsInterfaceError,
local_interface::BlockEventSender,
FetchMempoolTransactionsResponse,
NodeCommsRequest,
NodeCommsResponse,
OutboundNodeCommsInterface,
},
metrics,
base_node::comms_interface::{
error::CommsInterfaceError,
local_interface::BlockEventSender,
FetchMempoolTransactionsResponse,
NodeCommsRequest,
NodeCommsResponse,
OutboundNodeCommsInterface,
},
blocks::{Block, BlockBuilder, BlockHeader, BlockHeaderValidationError, ChainBlock, NewBlock, NewBlockTemplate},
chain_storage::{async_db::AsyncBlockchainDb, BlockAddResult, BlockchainBackend, ChainStorageError},
Expand Down Expand Up @@ -619,6 +619,7 @@ where B: BlockchainBackend + 'static
.build();
return Ok(block);
}
#[cfg(feature = "metrics")]
metrics::compact_block_tx_misses(header.height).set(excess_sigs.len() as i64);
let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
return Ok(block);
Expand All @@ -628,6 +629,7 @@ where B: BlockchainBackend + 'static
let (known_transactions, missing_excess_sigs) = self.mempool.retrieve_by_excess_sigs(excess_sigs).await?;
let known_transactions = known_transactions.into_iter().map(|tx| (*tx).clone()).collect();

#[cfg(feature = "metrics")]
metrics::compact_block_tx_misses(header.height).set(missing_excess_sigs.len() as i64);

let mut builder = BlockBuilder::new(header.version)
Expand Down Expand Up @@ -673,6 +675,7 @@ where B: BlockchainBackend + 'static
not_found.len()
);

#[cfg(feature = "metrics")]
metrics::compact_block_full_misses(header.height).inc();
let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
return Ok(block);
Expand Down Expand Up @@ -710,6 +713,7 @@ where B: BlockchainBackend + 'static
e,
);

#[cfg(feature = "metrics")]
metrics::compact_block_mmr_mismatch(header.height).inc();
let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
return Ok(block);
Expand Down Expand Up @@ -834,8 +838,11 @@ where B: BlockchainBackend + 'static
},

Err(e @ ChainStorageError::ValidationError { .. }) => {
let block_hash = block.hash();
metrics::rejected_blocks(block.header.height, &block_hash).inc();
#[cfg(feature = "metrics")]
{
let block_hash = block.hash();
metrics::rejected_blocks(block.header.height, &block_hash).inc();
}
warn!(
target: LOG_TARGET,
"Peer {} sent an invalid block: {}",
Expand All @@ -856,14 +863,20 @@ where B: BlockchainBackend + 'static
}
},
// SECURITY: This indicates an issue in the transaction validator.
None => metrics::rejected_local_blocks(block.header.height, &block_hash).inc(),
None => {
#[cfg(feature = "metrics")]
metrics::rejected_local_blocks(block.header.height, &block_hash).inc();
debug!(target: LOG_TARGET, "There may have been an issue in the transaction validator");
},
}
self.publish_block_event(BlockEvent::AddBlockValidationFailed { block, source_peer });
Err(e.into())
},

Err(e) => {
#[cfg(feature = "metrics")]
metrics::rejected_blocks(block.header.height, &block.hash()).inc();

self.publish_block_event(BlockEvent::AddBlockErrored { block });
Err(e.into())
},
Expand Down Expand Up @@ -936,6 +949,7 @@ where B: BlockchainBackend + 'static

async fn update_block_result_metrics(&self, block_add_result: &BlockAddResult) -> Result<(), CommsInterfaceError> {
fn update_target_difficulty(block: &ChainBlock) {
#[cfg(feature = "metrics")]
match block.header().pow_algo() {
PowAlgorithm::Sha3x => {
metrics::target_difficulty_sha()
Expand All @@ -950,25 +964,33 @@ where B: BlockchainBackend + 'static

match block_add_result {
BlockAddResult::Ok(ref block) => {
#[allow(clippy::cast_possible_wrap)]
metrics::tip_height().set(block.height() as i64);
update_target_difficulty(block);
let utxo_set_size = self.blockchain_db.utxo_count().await?;
metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));

#[cfg(feature = "metrics")]
{
#[allow(clippy::cast_possible_wrap)]
metrics::tip_height().set(block.height() as i64);
let utxo_set_size = self.blockchain_db.utxo_count().await?;
metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));
}
},
#[allow(unused_variables)] // `removed` variable is used if metrics are compiled
BlockAddResult::ChainReorg { added, removed } => {
#[cfg(feature = "metrics")]
if let Some(fork_height) = added.last().map(|b| b.height()) {
#[allow(clippy::cast_possible_wrap)]
metrics::tip_height().set(fork_height as i64);
metrics::reorg(fork_height, added.len(), removed.len()).inc();

let utxo_set_size = self.blockchain_db.utxo_count().await?;
metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));
}
for block in added {
update_target_difficulty(block);
}
let utxo_set_size = self.blockchain_db.utxo_count().await?;
metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));
},
BlockAddResult::OrphanBlock => {
#[cfg(feature = "metrics")]
metrics::orphaned_blocks().inc();
},
_ => {},
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub mod chain_metadata_service;
pub mod comms_interface;
#[cfg(feature = "base_node")]
pub use comms_interface::LocalNodeCommsInterface;
#[cfg(feature = "base_node")]
#[cfg(feature = "metrics")]
mod metrics;

#[cfg(feature = "base_node")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ use std::time::Instant;

use log::*;

#[cfg(feature = "metrics")]
use crate::base_node::metrics;
use crate::{
base_node::{
comms_interface::BlockEvent,
metrics,
state_machine_service::states::{BlockSyncInfo, HorizonStateSync, StateEvent, StateInfo, StatusInfo},
sync::{BlockSynchronizer, SyncPeer},
BaseNodeStateMachine,
Expand Down Expand Up @@ -63,6 +64,7 @@ impl BlockSync {
let local_nci = shared.local_node_interface.clone();
let randomx_vm_cnt = shared.get_randomx_vm_cnt();
let randomx_vm_flags = shared.get_randomx_vm_flags();
#[cfg(feature = "metrics")]
let tip_height_metric = metrics::tip_height();
synchronizer.on_starting(move |sync_peer| {
let _result = status_event_sender.send(StatusInfo {
Expand All @@ -81,6 +83,7 @@ impl BlockSync {
BlockAddResult::Ok(block),
));

#[cfg(feature = "metrics")]
tip_height_metric.set(local_height as i64);
let _result = status_event_sender.send(StatusInfo {
bootstrapped,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ use log::*;
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::peer_manager::NodeId;

#[cfg(feature = "metrics")]
use crate::base_node::metrics;
use crate::{
base_node::{
comms_interface::BlockEvent,
metrics,
state_machine_service::states::{BlockSyncInfo, StateEvent, StateInfo, StatusInfo},
sync::{BlockHeaderSyncError, HeaderSynchronizer, SyncPeer},
BaseNodeStateMachine,
Expand Down Expand Up @@ -146,6 +147,7 @@ impl HeaderSyncState {

let local_nci = shared.local_node_interface.clone();
synchronizer.on_rewind(move |removed| {
#[cfg(feature = "metrics")]
if let Some(fork_height) = removed.last().map(|b| b.height().saturating_sub(1)) {
metrics::tip_height().set(fork_height as i64);
metrics::reorg(fork_height, 0, removed.len()).inc();
Expand Down
7 changes: 6 additions & 1 deletion base_layer/core/src/base_node/sync/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ use tokio::{
};
use tracing::{instrument, span, Instrument, Level};

#[cfg(feature = "metrics")]
use crate::base_node::metrics;
use crate::{
base_node::{
comms_interface::{BlockEvent, BlockEvent::BlockSyncRewind},
metrics,
sync::{
header_sync::HEADER_SYNC_INITIAL_MAX_HEADERS,
rpc::{sync_utxos_task::SyncUtxosTask, BaseNodeSyncService},
Expand Down Expand Up @@ -99,6 +100,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncRpcService<B> {

let token = Arc::new(peer);
lock.push(Arc::downgrade(&token));
#[cfg(feature = "metrics")]
metrics::active_sync_peers().set(lock.len() as i64);
Ok(token)
}
Expand Down Expand Up @@ -256,6 +258,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}
}

#[cfg(feature = "metrics")]
metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -355,6 +358,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}
}

#[cfg(feature = "metrics")]
metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -572,6 +576,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}
}

#[cfg(feature = "metrics")]
metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
Expand Down
4 changes: 3 additions & 1 deletion base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ use tari_comms::{
use tari_utilities::hex::Hex;
use tokio::{sync::mpsc, task};

#[cfg(feature = "metrics")]
use crate::base_node::metrics;
use crate::{
base_node::metrics,
blocks::BlockHeader,
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
proto::base_node::{SyncUtxosRequest, SyncUtxosResponse},
Expand Down Expand Up @@ -106,6 +107,7 @@ where B: BlockchainBackend + 'static
target: LOG_TARGET,
"UTXO stream completed for peer '{}'", self.peer_node_id
);
#[cfg(feature = "metrics")]
metrics::active_sync_peers().dec();
});

Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ mod rpc;
pub use rpc::create_mempool_rpc_service;
#[cfg(feature = "base_node")]
pub use rpc::{MempoolRpcClient, MempoolRpcServer, MempoolRpcService, MempoolService};
#[cfg(feature = "base_node")]
#[cfg(feature = "metrics")]
mod metrics;
#[cfg(feature = "base_node")]
mod shrink_hashmap;
Expand Down
5 changes: 4 additions & 1 deletion base_layer/core/src/mempool/service/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ use log::*;
use tari_comms::peer_manager::NodeId;
use tari_utilities::hex::Hex;

#[cfg(feature = "metrics")]
use crate::mempool::metrics;
use crate::{
base_node::comms_interface::{BlockEvent, BlockEvent::AddBlockErrored},
chain_storage::BlockAddResult,
mempool::{
metrics,
service::{MempoolRequest, MempoolResponse, MempoolServiceError, OutboundMempoolServiceInterface},
Mempool,
TxStorageResponse,
Expand Down Expand Up @@ -135,6 +136,7 @@ impl MempoolInboundHandlers {
}
match self.mempool.insert(tx.clone()).await {
Ok(tx_storage) => {
#[cfg(feature = "metrics")]
if tx_storage.is_stored() {
metrics::inbound_transactions(source_peer.as_ref()).inc();
} else {
Expand Down Expand Up @@ -164,6 +166,7 @@ impl MempoolInboundHandlers {

#[allow(clippy::cast_possible_wrap)]
async fn update_pool_size_metrics(&self) {
#[cfg(feature = "metrics")]
if let Ok(stats) = self.mempool.stats().await {
metrics::unconfirmed_pool_size().set(stats.unconfirmed_txs as i64);
metrics::reorg_pool_size().set(stats.reorg_txs as i64);
Expand Down
7 changes: 6 additions & 1 deletion base_layer/core/src/mempool/sync_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ use tokio::{
time,
};

#[cfg(feature = "metrics")]
use crate::mempool::metrics;
use crate::{
base_node::comms_interface::{BlockEvent, BlockEventReceiver},
chain_storage::BlockAddResult,
mempool::{metrics, proto, Mempool, MempoolServiceConfig},
mempool::{proto, Mempool, MempoolServiceConfig},
proto as shared_proto,
transactions::transaction_components::Transaction,
};
Expand Down Expand Up @@ -544,6 +546,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin

#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_possible_wrap)]
#[cfg(feature = "metrics")]
{
let stats = self.mempool.stats().await?;
metrics::unconfirmed_pool_size().set(stats.unconfirmed_txs as i64);
Expand Down Expand Up @@ -580,6 +583,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin

let stored_result = self.mempool.insert(txn).await?;
if stored_result.is_stored() {
#[cfg(feature = "metrics")]
metrics::inbound_transactions(Some(&self.peer_node_id)).inc();
debug!(
target: LOG_TARGET,
Expand All @@ -588,6 +592,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin
self.peer_node_id.short_str()
);
} else {
#[cfg(feature = "metrics")]
metrics::rejected_inbound_transactions(Some(&self.peer_node_id)).inc();
debug!(
target: LOG_TARGET,
Expand Down

0 comments on commit 1fa74d9

Please sign in to comment.