Skip to content

Commit

Permalink
Add mempool metrics (#3664)
Browse files Browse the repository at this point in the history
* fix fragment size conversion

* add new metrics

* implement new metrics for prometheus

* update chain-libs
  • Loading branch information
zeegomo committed Oct 26, 2021
1 parent 90c5628 commit eb97023
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 32 deletions.
58 changes: 38 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions jormungandr-lib/src/interfaces/stats.rs
Expand Up @@ -23,12 +23,14 @@ pub struct NodeStats {
pub last_block_time: Option<SystemTime>,
pub last_block_tx: u64,
pub last_received_block_time: Option<SystemTime>,
pub block_content_size_avg: u32,
pub peer_available_cnt: usize,
pub peer_connected_cnt: usize,
pub peer_quarantined_cnt: usize,
pub peer_total_cnt: usize,
pub tx_recv_cnt: u64,
pub tx_pending: u64,
pub tx_pending_total_size: u64,
pub tx_rejected_cnt: u64,
pub votes_cast: u64,
pub uptime: Option<u64>,
Expand Down
17 changes: 16 additions & 1 deletion jormungandr/src/fragment/pool.rs
Expand Up @@ -548,6 +548,7 @@ pub(super) mod internal {
entries: IndexedDeqeue<FragmentId, Fragment>,
timeout_queue: BTreeSet<TimeoutQueueItem>,
max_entries: usize,
total_size_bytes: usize,
}

impl Pool {
Expand All @@ -558,6 +559,7 @@ pub(super) mod internal {
// out of their order in a queue. BinaryHeap does not allow that.
timeout_queue: BTreeSet::new(),
max_entries,
total_size_bytes: 0,
}
}

Expand All @@ -574,6 +576,7 @@ pub(super) mod internal {
if self.entries.contains(&fragment_id) {
false
} else {
self.total_size_bytes += fragment.serialized_size();
self.timeout_queue_insert(fragment);
self.entries.push_front(fragment_id, fragment.clone());
true
Expand All @@ -588,19 +591,22 @@ pub(super) mod internal {
let maybe_fragment = self.entries.remove(fragment_id);
if let Some(fragment) = maybe_fragment {
self.timeout_queue_remove(&fragment);
self.total_size_bytes -= fragment.serialized_size();
}
}
}

pub fn remove_oldest(&mut self) -> Option<Fragment> {
let fragment = self.entries.pop_back().map(|(_, value)| value)?;
self.timeout_queue_remove(&fragment);
self.total_size_bytes -= fragment.serialized_size();
Some(fragment)
}

pub fn return_to_pool(&mut self, fragments: impl IntoIterator<Item = Fragment>) {
for fragment in fragments.into_iter() {
self.timeout_queue_insert(&fragment);
self.total_size_bytes += fragment.serialized_size();
self.entries.push_back(fragment.id(), fragment);
}
}
Expand Down Expand Up @@ -634,7 +640,9 @@ pub(super) mod internal {
.collect();
for item in &to_remove {
self.timeout_queue.remove(item);
self.entries.remove(&item.id);
if let Some(fragment) = self.entries.remove(&item.id) {
self.total_size_bytes -= fragment.serialized_size();
}
}
to_remove.into_iter().map(|x| x.id).collect()
// TODO convert to something like this when .first() and .pop_first() are stabilized. This does not have unnecessary clones.
Expand Down Expand Up @@ -688,6 +696,13 @@ pub(super) mod internal {
];
let mut pool = Pool::new(4);
assert_eq!(fragments1, pool.insert_all(fragments1.clone()));
assert_eq!(
pool.total_size_bytes,
fragments1
.iter()
.map(|f| f.to_raw().size_bytes_plus_size())
.sum::<usize>()
);
assert_eq!(fragments2_expected, pool.insert_all(fragments2));
for expected in final_expected.into_iter() {
assert_eq!(expected, pool.remove_oldest().unwrap());
Expand Down
23 changes: 13 additions & 10 deletions jormungandr/src/fragment/selection.rs
Expand Up @@ -79,16 +79,19 @@ async fn try_apply_fragment(
mut space_left: u32,
) -> Result<NewLedgerState, ApplyFragmentError> {
use futures::future::{select, Either};
let fragment_raw = fragment.to_raw(); // TODO: replace everything to FragmentRaw in the node
let fragment_size = fragment_raw.size_bytes_plus_size() as u32;

if fragment_size > ledger_params.block_content_max_size {
let reason = format!(
"fragment size {} exceeds maximum block content size {}",
fragment_size, ledger_params.block_content_max_size
);
return Err(ApplyFragmentError::Rejected(reason));
}
use std::convert::TryFrom;

let raw_fragment_size = fragment.serialized_size();
let fragment_size = match u32::try_from(raw_fragment_size) {
Ok(size) if size <= ledger_params.block_content_max_size => size,
_ => {
let reason = format!(
"fragment size {} exceeds maximum block content size {}",
raw_fragment_size, ledger_params.block_content_max_size
);
return Err(ApplyFragmentError::Rejected(reason));
}
};

if fragment_size > space_left {
// return a fragment to the pool later if does not fit the contents size limit
Expand Down
12 changes: 12 additions & 0 deletions jormungandr/src/metrics/backends/prometheus_exporter.rs
Expand Up @@ -21,6 +21,7 @@ pub struct Prometheus {
tx_recv_cnt: IntCounter,
tx_rejected_cnt: IntCounter,
tx_pending_cnt: UIntGauge,
tx_pending_size_bytes_total: UIntGauge,
votes_casted_cnt: IntCounter,
block_recv_cnt: IntCounter,
peer_connected_cnt: UIntGauge,
Expand Down Expand Up @@ -80,6 +81,11 @@ impl Default for Prometheus {
registry.register(Box::new(tx_recv_cnt.clone())).unwrap();
let tx_pending_cnt = UIntGauge::new("txPending", "txPending").unwrap();
registry.register(Box::new(tx_pending_cnt.clone())).unwrap();
let tx_pending_size_bytes_total =
UIntGauge::new("txPendingSizeBytesTotal", "txPendingSizeBytesTotal").unwrap();
registry
.register(Box::new(tx_pending_size_bytes_total.clone()))
.unwrap();
let tx_rejected_cnt = IntCounter::new("txRejectedCnt", "txRejectedCnt").unwrap();
registry
.register(Box::new(tx_rejected_cnt.clone()))
Expand Down Expand Up @@ -150,6 +156,7 @@ impl Default for Prometheus {
tx_recv_cnt,
tx_rejected_cnt,
tx_pending_cnt,
tx_pending_size_bytes_total,
votes_casted_cnt,
block_recv_cnt,
peer_connected_cnt,
Expand Down Expand Up @@ -187,6 +194,11 @@ impl MetricsBackend for Prometheus {
self.tx_pending_cnt.set(count);
}

fn set_tx_pending_total_size(&self, size: usize) {
let size = size.try_into().unwrap();
self.tx_pending_size_bytes_total.set(size);
}

fn add_block_recv_cnt(&self, count: usize) {
let count = count.try_into().unwrap();
self.block_recv_cnt.inc_by(count);
Expand Down

0 comments on commit eb97023

Please sign in to comment.