From eb97023beae46c60abf99b4d2cd637cbc7b0447b Mon Sep 17 00:00:00 2001 From: Giacomo Pasini Date: Tue, 26 Oct 2021 16:23:42 +0200 Subject: [PATCH] Add mempool metrics (#3664) * fix fragment size conversion * add new metrics * implement new metrics for prometheus * update chain-libs --- Cargo.lock | 58 ++++++++++++------- jormungandr-lib/src/interfaces/stats.rs | 2 + jormungandr/src/fragment/pool.rs | 17 +++++- jormungandr/src/fragment/selection.rs | 23 ++++---- .../metrics/backends/prometheus_exporter.rs | 12 ++++ .../src/metrics/backends/simple_counter.rs | 30 +++++++++- jormungandr/src/metrics/mod.rs | 2 + 7 files changed, 112 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bebfc7bc30..df100dec80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -445,7 +445,7 @@ dependencies = [ [[package]] name = "cardano-legacy-address" version = "0.1.1" -source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#39e1f7f4e3d76df61bfbc24d8ce4b72a02c9ecc4" +source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#42b6cdfbc87a7738526f7bbfc6efcacdae8aef07" dependencies = [ "cbor_event", "chain-ser", @@ -489,7 +489,7 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chain-addr" version = "0.1.0" -source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#39e1f7f4e3d76df61bfbc24d8ce4b72a02c9ecc4" +source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#42b6cdfbc87a7738526f7bbfc6efcacdae8aef07" dependencies = [ "bech32 0.8.1", "chain-core", @@ -503,7 +503,7 @@ dependencies = [ [[package]] name = "chain-core" version = "0.1.0" -source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#39e1f7f4e3d76df61bfbc24d8ce4b72a02c9ecc4" +source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#42b6cdfbc87a7738526f7bbfc6efcacdae8aef07" dependencies = [ "chain-ser", ] @@ -511,7 +511,7 @@ dependencies = [ [[package]] name = "chain-crypto" version = "0.1.0" -source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#39e1f7f4e3d76df61bfbc24d8ce4b72a02c9ecc4" +source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#42b6cdfbc87a7738526f7bbfc6efcacdae8aef07" dependencies = [ "bech32 0.8.1", "cryptoxide", @@ -534,7 +534,7 @@ dependencies = [ [[package]] name = "chain-impl-mockchain" version = "0.1.0" -source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#39e1f7f4e3d76df61bfbc24d8ce4b72a02c9ecc4" +source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#42b6cdfbc87a7738526f7bbfc6efcacdae8aef07" dependencies = [ "cardano-legacy-address", "chain-addr", @@ -554,8 +554,8 @@ dependencies = [ "rand_core 0.6.3", "rayon", "sparse-array", - "strum", - "strum_macros", + "strum 0.22.0", + "strum_macros 0.22.0", "thiserror", "typed-bytes", ] @@ -563,7 +563,7 @@ dependencies = [ [[package]] name = "chain-network" version = "0.1.0" -source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#39e1f7f4e3d76df61bfbc24d8ce4b72a02c9ecc4" +source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#42b6cdfbc87a7738526f7bbfc6efcacdae8aef07" dependencies = [ "async-trait", "chain-crypto", @@ -580,12 +580,12 @@ dependencies = [ [[package]] name = "chain-ser" version = "0.1.0" -source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#39e1f7f4e3d76df61bfbc24d8ce4b72a02c9ecc4" +source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#42b6cdfbc87a7738526f7bbfc6efcacdae8aef07" [[package]] name = "chain-storage" version = "0.1.0" -source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#39e1f7f4e3d76df61bfbc24d8ce4b72a02c9ecc4" +source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#42b6cdfbc87a7738526f7bbfc6efcacdae8aef07" dependencies = [ "criterion", "data-pile", @@ -598,7 +598,7 @@ dependencies = [ [[package]] name = "chain-time" version = "0.1.0" -source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#39e1f7f4e3d76df61bfbc24d8ce4b72a02c9ecc4" +source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#42b6cdfbc87a7738526f7bbfc6efcacdae8aef07" dependencies = [ "chain-core", "chain-ser", @@ -610,9 +610,9 @@ dependencies = [ [[package]] name = "chain-vote" version = "0.1.0" -source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#39e1f7f4e3d76df61bfbc24d8ce4b72a02c9ecc4" +source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#42b6cdfbc87a7738526f7bbfc6efcacdae8aef07" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 0.1.10", "chain-core", "chain-crypto", "const_format", @@ -1779,7 +1779,7 @@ dependencies = [ [[package]] name = "imhamt" version = "0.1.0" -source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#39e1f7f4e3d76df61bfbc24d8ce4b72a02c9ecc4" +source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#42b6cdfbc87a7738526f7bbfc6efcacdae8aef07" dependencies = [ "thiserror", ] @@ -2113,7 +2113,7 @@ dependencies = [ "serde_derive", "serde_json", "serde_yaml", - "strum", + "strum 0.21.0", "symmetric-cipher", "sysinfo 0.20.3", "tar", @@ -3667,7 +3667,7 @@ dependencies = [ [[package]] name = "sparse-array" version = "0.1.0" -source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#39e1f7f4e3d76df61bfbc24d8ce4b72a02c9ecc4" +source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#42b6cdfbc87a7738526f7bbfc6efcacdae8aef07" [[package]] name = "spin" @@ -3752,9 +3752,15 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aaf86bbcfd1fa9670b7a129f64fc0c9fcbbfe4f1bc4210e9e98fe71ffc12cde2" dependencies = [ - "strum_macros", + "strum_macros 0.21.1", ] +[[package]] +name = "strum" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7ac893c7d471c8a21f31cfe213ec4f6d9afeed25537c772e08ef3f005f8729e" + [[package]] name = "strum_macros" version = "0.21.1" @@ -3767,6 +3773,18 @@ dependencies = [ "syn 1.0.74", ] +[[package]] +name = "strum_macros" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339f799d8b549e3744c7ac7feb216383e4005d94bdb22561b3ab8f3b808ae9fb" +dependencies = [ + "heck", + "proc-macro2 1.0.28", + "quote 1.0.9", + "syn 1.0.74", +] + [[package]] name = "subtle" version = "2.4.1" @@ -4129,9 +4147,9 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d12faebbe071b06f486be82cc9318350814fdd07fcb28f3690840cd770599283" +checksum = "12b52d07035516c2b74337d2ac7746075e7dcae7643816c1b12c5ff8a7484c08" dependencies = [ "proc-macro2 1.0.28", "prost-build", @@ -4349,7 +4367,7 @@ dependencies = [ [[package]] name = "typed-bytes" version = "0.1.0" -source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#39e1f7f4e3d76df61bfbc24d8ce4b72a02c9ecc4" +source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#42b6cdfbc87a7738526f7bbfc6efcacdae8aef07" [[package]] name = "typenum" diff --git a/jormungandr-lib/src/interfaces/stats.rs b/jormungandr-lib/src/interfaces/stats.rs index bd303f8c17..7b896c4464 100644 --- a/jormungandr-lib/src/interfaces/stats.rs +++ b/jormungandr-lib/src/interfaces/stats.rs @@ -23,12 +23,14 @@ pub struct NodeStats { pub last_block_time: Option, pub last_block_tx: u64, pub last_received_block_time: Option, + pub block_content_size_avg: u32, pub peer_available_cnt: usize, pub peer_connected_cnt: usize, pub peer_quarantined_cnt: usize, pub peer_total_cnt: usize, pub tx_recv_cnt: u64, pub tx_pending: u64, + pub tx_pending_total_size: u64, pub tx_rejected_cnt: u64, pub votes_cast: u64, pub uptime: Option, diff --git a/jormungandr/src/fragment/pool.rs b/jormungandr/src/fragment/pool.rs index a4c08ba795..46464dbb8d 100644 --- a/jormungandr/src/fragment/pool.rs +++ b/jormungandr/src/fragment/pool.rs @@ -548,6 +548,7 @@ pub(super) mod internal { entries: IndexedDeqeue, timeout_queue: BTreeSet, max_entries: usize, + total_size_bytes: usize, } impl Pool { @@ -558,6 +559,7 @@ pub(super) mod internal { // out of their order in a queue. BinaryHeap does not allow that. timeout_queue: BTreeSet::new(), max_entries, + total_size_bytes: 0, } } @@ -574,6 +576,7 @@ pub(super) mod internal { if self.entries.contains(&fragment_id) { false } else { + self.total_size_bytes += fragment.serialized_size(); self.timeout_queue_insert(fragment); self.entries.push_front(fragment_id, fragment.clone()); true @@ -588,6 +591,7 @@ pub(super) mod internal { let maybe_fragment = self.entries.remove(fragment_id); if let Some(fragment) = maybe_fragment { self.timeout_queue_remove(&fragment); + self.total_size_bytes -= fragment.serialized_size(); } } } @@ -595,12 +599,14 @@ pub(super) mod internal { pub fn remove_oldest(&mut self) -> Option { let fragment = self.entries.pop_back().map(|(_, value)| value)?; self.timeout_queue_remove(&fragment); + self.total_size_bytes -= fragment.serialized_size(); Some(fragment) } pub fn return_to_pool(&mut self, fragments: impl IntoIterator) { for fragment in fragments.into_iter() { self.timeout_queue_insert(&fragment); + self.total_size_bytes += fragment.serialized_size(); self.entries.push_back(fragment.id(), fragment); } } @@ -634,7 +640,9 @@ pub(super) mod internal { .collect(); for item in &to_remove { self.timeout_queue.remove(item); - self.entries.remove(&item.id); + if let Some(fragment) = self.entries.remove(&item.id) { + self.total_size_bytes -= fragment.serialized_size(); + } } to_remove.into_iter().map(|x| x.id).collect() // TODO convert to something like this when .first() and .pop_first() are stabilized. This does not have unnecessary clones. @@ -688,6 +696,13 @@ pub(super) mod internal { ]; let mut pool = Pool::new(4); assert_eq!(fragments1, pool.insert_all(fragments1.clone())); + assert_eq!( + pool.total_size_bytes, + fragments1 + .iter() + .map(|f| f.to_raw().size_bytes_plus_size()) + .sum::() + ); assert_eq!(fragments2_expected, pool.insert_all(fragments2)); for expected in final_expected.into_iter() { assert_eq!(expected, pool.remove_oldest().unwrap()); diff --git a/jormungandr/src/fragment/selection.rs b/jormungandr/src/fragment/selection.rs index 4bfa4ca72b..d46ba476e3 100644 --- a/jormungandr/src/fragment/selection.rs +++ b/jormungandr/src/fragment/selection.rs @@ -79,16 +79,19 @@ async fn try_apply_fragment( mut space_left: u32, ) -> Result { use futures::future::{select, Either}; - let fragment_raw = fragment.to_raw(); // TODO: replace everything to FragmentRaw in the node - let fragment_size = fragment_raw.size_bytes_plus_size() as u32; - - if fragment_size > ledger_params.block_content_max_size { - let reason = format!( - "fragment size {} exceeds maximum block content size {}", - fragment_size, ledger_params.block_content_max_size - ); - return Err(ApplyFragmentError::Rejected(reason)); - } + use std::convert::TryFrom; + + let raw_fragment_size = fragment.serialized_size(); + let fragment_size = match u32::try_from(raw_fragment_size) { + Ok(size) if size <= ledger_params.block_content_max_size => size, + _ => { + let reason = format!( + "fragment size {} exceeds maximum block content size {}", + raw_fragment_size, ledger_params.block_content_max_size + ); + return Err(ApplyFragmentError::Rejected(reason)); + } + }; if fragment_size > space_left { // return a fragment to the pool later if does not fit the contents size limit diff --git a/jormungandr/src/metrics/backends/prometheus_exporter.rs b/jormungandr/src/metrics/backends/prometheus_exporter.rs index eb5205b339..ba33585159 100644 --- a/jormungandr/src/metrics/backends/prometheus_exporter.rs +++ b/jormungandr/src/metrics/backends/prometheus_exporter.rs @@ -21,6 +21,7 @@ pub struct Prometheus { tx_recv_cnt: IntCounter, tx_rejected_cnt: IntCounter, tx_pending_cnt: UIntGauge, + tx_pending_size_bytes_total: UIntGauge, votes_casted_cnt: IntCounter, block_recv_cnt: IntCounter, peer_connected_cnt: UIntGauge, @@ -80,6 +81,11 @@ impl Default for Prometheus { registry.register(Box::new(tx_recv_cnt.clone())).unwrap(); let tx_pending_cnt = UIntGauge::new("txPending", "txPending").unwrap(); registry.register(Box::new(tx_pending_cnt.clone())).unwrap(); + let tx_pending_size_bytes_total = + UIntGauge::new("txPendingSizeBytesTotal", "txPendingSizeBytesTotal").unwrap(); + registry + .register(Box::new(tx_pending_size_bytes_total.clone())) + .unwrap(); let tx_rejected_cnt = IntCounter::new("txRejectedCnt", "txRejectedCnt").unwrap(); registry .register(Box::new(tx_rejected_cnt.clone())) @@ -150,6 +156,7 @@ impl Default for Prometheus { tx_recv_cnt, tx_rejected_cnt, tx_pending_cnt, + tx_pending_size_bytes_total, votes_casted_cnt, block_recv_cnt, peer_connected_cnt, @@ -187,6 +194,11 @@ impl MetricsBackend for Prometheus { self.tx_pending_cnt.set(count); } + fn set_tx_pending_total_size(&self, size: usize) { + let size = size.try_into().unwrap(); + self.tx_pending_size_bytes_total.set(size); + } + fn add_block_recv_cnt(&self, count: usize) { let count = count.try_into().unwrap(); self.block_recv_cnt.inc_by(count); diff --git a/jormungandr/src/metrics/backends/simple_counter.rs b/jormungandr/src/metrics/backends/simple_counter.rs index bbfd5f3238..2f4f225075 100644 --- a/jormungandr/src/metrics/backends/simple_counter.rs +++ b/jormungandr/src/metrics/backends/simple_counter.rs @@ -15,10 +15,13 @@ use std::time::Instant; use arc_swap::ArcSwapOption; +const EXP_MOVING_AVERAGE_COEFF: f64 = 0.5; + pub struct SimpleCounter { tx_recv_cnt: AtomicUsize, tx_rejected_cnt: AtomicUsize, tx_pending_cnt: AtomicUsize, + tx_pending_total_size: AtomicUsize, votes_cast: AtomicU64, block_recv_cnt: AtomicUsize, slot_start_time: AtomicU64, @@ -34,6 +37,7 @@ struct BlockCounters { block_input_sum: u64, block_fee_sum: u64, content_size: u32, + avg_content_size: u32, date: String, hash: String, chain_length: String, @@ -70,6 +74,7 @@ impl SimpleCounter { last_received_block_time: Some(SystemTime::from_secs_since_epoch( self.slot_start_time.load(Ordering::Relaxed), )), + block_content_size_avg: block_data.map(|bd| bd.avg_content_size).unwrap_or_default(), peer_available_cnt, peer_connected_cnt: self.peers_connected_cnt.load(Ordering::Relaxed), peer_quarantined_cnt, @@ -80,6 +85,11 @@ impl SimpleCounter { .load(Ordering::Relaxed) .try_into() .unwrap(), + tx_pending_total_size: self + .tx_pending_total_size + .load(Ordering::Relaxed) + .try_into() + .unwrap(), tx_rejected_cnt: self .tx_rejected_cnt .load(Ordering::Relaxed) @@ -97,6 +107,7 @@ impl Default for SimpleCounter { tx_recv_cnt: Default::default(), tx_rejected_cnt: Default::default(), tx_pending_cnt: Default::default(), + tx_pending_total_size: Default::default(), votes_cast: Default::default(), block_recv_cnt: Default::default(), slot_start_time: Default::default(), @@ -109,6 +120,11 @@ impl Default for SimpleCounter { } } +fn calc_running_block_size_average(last_avg: u32, new_value: u32) -> u32 { + (last_avg as f64 * (1.0 - EXP_MOVING_AVERAGE_COEFF) + + new_value as f64 * EXP_MOVING_AVERAGE_COEFF) as u32 +} + impl MetricsBackend for SimpleCounter { fn add_tx_recv_cnt(&self, count: usize) { self.tx_recv_cnt.fetch_add(count, Ordering::Relaxed); @@ -122,6 +138,10 @@ impl MetricsBackend for SimpleCounter { self.tx_pending_cnt.store(count, Ordering::Relaxed); } + fn set_tx_pending_total_size(&self, size: usize) { + self.tx_pending_total_size.store(size, Ordering::Relaxed); + } + fn add_block_recv_cnt(&self, count: usize) { self.block_recv_cnt.fetch_add(count, Ordering::Relaxed); } @@ -194,11 +214,19 @@ impl MetricsBackend for SimpleCounter { }) .expect("should be good"); + let content_size = block.header().block_content_size(); + let last_avg = if let Some(data) = self.tip_block.load().as_deref() { + data.avg_content_size + } else { + content_size // jump start moving average from first known value + }; + let block_data = BlockCounters { block_tx_count, block_input_sum: block_input_sum.0, block_fee_sum: block_fee_sum.0, - content_size: block.header().block_content_size(), + content_size, + avg_content_size: calc_running_block_size_average(last_avg, content_size), date: block.header().block_date().to_string(), hash: block.header().hash().to_string(), chain_length: block.header().chain_length().to_string(), diff --git a/jormungandr/src/metrics/mod.rs b/jormungandr/src/metrics/mod.rs index 728e77e1ca..774d1a8b05 100644 --- a/jormungandr/src/metrics/mod.rs +++ b/jormungandr/src/metrics/mod.rs @@ -10,6 +10,7 @@ pub mod backends; pub trait MetricsBackend { fn add_tx_recv_cnt(&self, count: usize); fn set_tx_pending_cnt(&self, count: usize); + fn set_tx_pending_total_size(&self, size: usize); fn add_tx_rejected_cnt(&self, count: usize); fn add_block_recv_cnt(&self, count: usize); fn add_peer_connected_cnt(&self, count: usize); @@ -70,6 +71,7 @@ impl MetricsBackend for Metrics { metrics_count_method!(add_tx_recv_cnt); metrics_count_method!(add_tx_rejected_cnt); metrics_count_method!(set_tx_pending_cnt); + metrics_count_method!(set_tx_pending_total_size); metrics_count_method!(add_block_recv_cnt); metrics_count_method!(add_peer_connected_cnt); metrics_count_method!(sub_peer_connected_cnt);