From dae8ec4b4a13811a606bd1cf66607f7b1ef7606c Mon Sep 17 00:00:00 2001 From: fkrause98 Date: Tue, 18 Feb 2025 16:32:49 -0300 Subject: [PATCH 01/17] feat(store): use concurrent hashmap for mempool --- Cargo.lock | 5 ++-- crates/storage/store/Cargo.toml | 2 +- crates/storage/store/storage.rs | 45 ++++++++++----------------------- 3 files changed, 18 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d6b7e8b25aa..6a854f8c20c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3095,6 +3095,7 @@ dependencies = [ "hex-literal", "libmdbx", "redb", + "scc", "serde", "serde_json", "sha3", @@ -7432,9 +7433,9 @@ dependencies = [ [[package]] name = "scc" -version = "2.2.5" +version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66b202022bb57c049555430e11fc22fea12909276a80a4c3d368da36ac1d88ed" +checksum = "ea091f6cac2595aa38993f04f4ee692ed43757035c36e67c180b6828356385b1" dependencies = [ "sdd", ] diff --git a/crates/storage/store/Cargo.toml b/crates/storage/store/Cargo.toml index 1e9293aebd7..dae537517f6 100644 --- a/crates/storage/store/Cargo.toml +++ b/crates/storage/store/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" ethrex-rlp.workspace = true ethrex-common.workspace = true ethrex-trie.workspace = true - +scc = "2.3.3" ethereum-types.workspace = true anyhow = "1.0.86" bytes.workspace = true diff --git a/crates/storage/store/storage.rs b/crates/storage/store/storage.rs index e45a07c65da..809b6542651 100644 --- a/crates/storage/store/storage.rs +++ b/crates/storage/store/storage.rs @@ -15,6 +15,7 @@ use ethrex_common::types::{ use ethrex_rlp::decode::RLPDecode; use ethrex_rlp::encode::RLPEncode; use ethrex_trie::{Nibbles, Trie}; +use scc::HashMap as Map; use serde::{Deserialize, Serialize}; use sha3::{Digest as _, Keccak256}; use std::collections::{HashMap, HashSet}; @@ -35,7 +36,7 @@ pub const MAX_SNAPSHOT_READS: usize = 100; #[derive(Debug, Clone)] pub struct Store { engine: Arc, - pub mempool: Arc>>, + pub mempool: Map, pub blobs_bundle_pool: Arc>>, } @@ -86,18 +87,18 @@ impl Store { #[cfg(feature = "libmdbx")] EngineType::Libmdbx => Self { engine: Arc::new(LibmdbxStore::new(path)?), - mempool: Arc::new(Mutex::new(HashMap::new())), + mempool: Map::new(), blobs_bundle_pool: Arc::new(Mutex::new(HashMap::new())), }, EngineType::InMemory => Self { engine: Arc::new(InMemoryStore::new()), - mempool: Arc::new(Mutex::new(HashMap::new())), + mempool: Map::new(), blobs_bundle_pool: Arc::new(Mutex::new(HashMap::new())), }, #[cfg(feature = "redb")] EngineType::RedB => Self { engine: Arc::new(RedBStore::new()?), - mempool: Arc::new(Mutex::new(HashMap::new())), + mempool: Map::new(), blobs_bundle_pool: Arc::new(Mutex::new(HashMap::new())), }, }; @@ -278,11 +279,10 @@ impl Store { hash: H256, transaction: MempoolTransaction, ) -> Result<(), StoreError> { - let mut mempool = self - .mempool - .lock() - .map_err(|error| StoreError::Custom(error.to_string()))?; - mempool.insert(hash, transaction); + // let mut mempool = self + // .mempool + // .map_err(|error| StoreError::Custom(error.to_string()))?; + self.mempool.insert(hash, transaction); Ok(()) } @@ -315,11 +315,7 @@ impl Store { /// Remove a transaction from the pool pub fn remove_transaction_from_pool(&self, hash: &H256) -> Result<(), StoreError> { - let mut mempool = self - .mempool - .lock() - .map_err(|error| StoreError::Custom(error.to_string()))?; - if let Some(tx) = mempool.get(hash) { + if let Some(tx) = self.mempool.get(hash) { if matches!(tx.tx_type(), TxType::EIP4844) { self.blobs_bundle_pool .lock() @@ -327,7 +323,7 @@ impl Store { .remove(&tx.compute_hash()); } - mempool.remove(hash); + self.mempool.remove(hash); }; Ok(()) @@ -340,21 +336,14 @@ impl Store { filter: &dyn Fn(&Transaction) -> bool, ) -> Result>, StoreError> { let mut txs_by_sender: HashMap> = HashMap::new(); - let mempool = self - .mempool - .lock() - .map_err(|error| StoreError::Custom(error.to_string()))?; - - for (_, tx) in mempool.iter() { + self.mempool.scan(|_, tx| { if filter(tx) { txs_by_sender .entry(tx.sender()) .or_default() .push(tx.clone()) } - } - - txs_by_sender.iter_mut().for_each(|(_, txs)| txs.sort()); + }); Ok(txs_by_sender) } @@ -363,15 +352,9 @@ impl Store { &self, possible_hashes: &[H256], ) -> Result, StoreError> { - let mempool = self - .mempool - .lock() - .map_err(|error| StoreError::Custom(error.to_string()))?; - - let tx_set: HashSet<_> = mempool.iter().map(|(hash, _)| hash).collect(); Ok(possible_hashes .iter() - .filter(|hash| !tx_set.contains(hash)) + .filter(|hash| self.mempool.get(*hash).is_none()) .copied() .collect()) } From 477d6214021d98cec9057955133d4945ea74ac88 Mon Sep 17 00:00:00 2001 From: fkrause98 Date: Tue, 18 Feb 2025 16:37:19 -0300 Subject: [PATCH 02/17] refactor: rename to concurrent map --- crates/storage/store/storage.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/storage/store/storage.rs b/crates/storage/store/storage.rs index 809b6542651..1a93c51ab31 100644 --- a/crates/storage/store/storage.rs +++ b/crates/storage/store/storage.rs @@ -15,7 +15,7 @@ use ethrex_common::types::{ use ethrex_rlp::decode::RLPDecode; use ethrex_rlp::encode::RLPEncode; use ethrex_trie::{Nibbles, Trie}; -use scc::HashMap as Map; +use scc::HashMap as ConcurrentMap; use serde::{Deserialize, Serialize}; use sha3::{Digest as _, Keccak256}; use std::collections::{HashMap, HashSet}; @@ -36,7 +36,7 @@ pub const MAX_SNAPSHOT_READS: usize = 100; #[derive(Debug, Clone)] pub struct Store { engine: Arc, - pub mempool: Map, + pub mempool: ConcurrentMap, pub blobs_bundle_pool: Arc>>, } @@ -87,18 +87,18 @@ impl Store { #[cfg(feature = "libmdbx")] EngineType::Libmdbx => Self { engine: Arc::new(LibmdbxStore::new(path)?), - mempool: Map::new(), + mempool: ConcurrentMap::new(), blobs_bundle_pool: Arc::new(Mutex::new(HashMap::new())), }, EngineType::InMemory => Self { engine: Arc::new(InMemoryStore::new()), - mempool: Map::new(), + mempool: ConcurrentMap::new(), blobs_bundle_pool: Arc::new(Mutex::new(HashMap::new())), }, #[cfg(feature = "redb")] EngineType::RedB => Self { engine: Arc::new(RedBStore::new()?), - mempool: Map::new(), + mempool: ConcurrentMap::new(), blobs_bundle_pool: Arc::new(Mutex::new(HashMap::new())), }, }; From 5bcc51173be627790948b2eb4cd965905e73b898 Mon Sep 17 00:00:00 2001 From: fkrause98 Date: Wed, 19 Feb 2025 15:21:34 -0300 Subject: [PATCH 03/17] fix: restore sort in transactions --- crates/storage/store/storage.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/storage/store/storage.rs b/crates/storage/store/storage.rs index 8078911adcd..9f96abfdb98 100644 --- a/crates/storage/store/storage.rs +++ b/crates/storage/store/storage.rs @@ -299,7 +299,6 @@ impl Store { // .mempool // .map_err(|error| StoreError::Custom(error.to_string()))?; self.mempool.insert(hash, transaction); - Ok(()) } @@ -360,6 +359,9 @@ impl Store { .push(tx.clone()) } }); + + txs_by_sender.iter_mut().for_each(|(_, txs)| txs.sort()); + Ok(txs_by_sender) } From 22965128b3d966c04f691ff6f8ea171dfc9be2c0 Mon Sep 17 00:00:00 2001 From: fkrause98 Date: Wed, 19 Feb 2025 17:36:32 -0300 Subject: [PATCH 04/17] fix: restore Arc for concurrent map --- crates/storage/store/storage.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/storage/store/storage.rs b/crates/storage/store/storage.rs index 9f96abfdb98..aa8967ed8a8 100644 --- a/crates/storage/store/storage.rs +++ b/crates/storage/store/storage.rs @@ -37,7 +37,7 @@ pub const MAX_SNAPSHOT_READS: usize = 100; #[derive(Debug, Clone)] pub struct Store { engine: Arc, - pub mempool: ConcurrentMap, + pub mempool: Arc>, pub blobs_bundle_pool: Arc>>, } @@ -88,18 +88,18 @@ impl Store { #[cfg(feature = "libmdbx")] EngineType::Libmdbx => Self { engine: Arc::new(LibmdbxStore::new(path)?), - mempool: ConcurrentMap::new(), + mempool: ConcurrentMap::new().into(), blobs_bundle_pool: Arc::new(Mutex::new(HashMap::new())), }, EngineType::InMemory => Self { engine: Arc::new(InMemoryStore::new()), - mempool: ConcurrentMap::new(), + mempool: ConcurrentMap::new().into(), blobs_bundle_pool: Arc::new(Mutex::new(HashMap::new())), }, #[cfg(feature = "redb")] EngineType::RedB => Self { engine: Arc::new(RedBStore::new()?), - mempool: ConcurrentMap::new(), + mempool: ConcurrentMap::new().into(), blobs_bundle_pool: Arc::new(Mutex::new(HashMap::new())), }, }; From 186f32d5129fd6379ee1c51faa327e221dd9e62e Mon Sep 17 00:00:00 2001 From: fkrause98 Date: Wed, 19 Feb 2025 17:52:20 -0300 Subject: [PATCH 05/17] fix: l2 deploy works --- crates/storage/store/storage.rs | 35 +++++++++++---------------------- 1 file changed, 12 insertions(+), 23 deletions(-) diff --git a/crates/storage/store/storage.rs b/crates/storage/store/storage.rs index aa8967ed8a8..f7a72c98c7d 100644 --- a/crates/storage/store/storage.rs +++ b/crates/storage/store/storage.rs @@ -38,7 +38,7 @@ pub const MAX_SNAPSHOT_READS: usize = 100; pub struct Store { engine: Arc, pub mempool: Arc>, - pub blobs_bundle_pool: Arc>>, + pub blobs_bundle_pool: Arc>, } #[allow(dead_code)] @@ -89,18 +89,18 @@ impl Store { EngineType::Libmdbx => Self { engine: Arc::new(LibmdbxStore::new(path)?), mempool: ConcurrentMap::new().into(), - blobs_bundle_pool: Arc::new(Mutex::new(HashMap::new())), + blobs_bundle_pool: ConcurrentMap::new().into(), }, EngineType::InMemory => Self { engine: Arc::new(InMemoryStore::new()), mempool: ConcurrentMap::new().into(), - blobs_bundle_pool: Arc::new(Mutex::new(HashMap::new())), + blobs_bundle_pool: ConcurrentMap::new().into(), }, #[cfg(feature = "redb")] EngineType::RedB => Self { engine: Arc::new(RedBStore::new()?), mempool: ConcurrentMap::new().into(), - blobs_bundle_pool: Arc::new(Mutex::new(HashMap::new())), + blobs_bundle_pool: ConcurrentMap::new().into(), }, }; info!("Started store engine"); @@ -295,10 +295,7 @@ impl Store { hash: H256, transaction: MempoolTransaction, ) -> Result<(), StoreError> { - // let mut mempool = self - // .mempool - // .map_err(|error| StoreError::Custom(error.to_string()))?; - self.mempool.insert(hash, transaction); + self.mempool.entry(hash).or_insert(transaction); Ok(()) } @@ -309,9 +306,8 @@ impl Store { blobs_bundle: BlobsBundle, ) -> Result<(), StoreError> { self.blobs_bundle_pool - .lock() - .map_err(|error| StoreError::Custom(error.to_string()))? - .insert(tx_hash, blobs_bundle); + .entry(tx_hash) + .or_insert(blobs_bundle); Ok(()) } @@ -320,25 +316,18 @@ impl Store { &self, tx_hash: H256, ) -> Result, StoreError> { - Ok(self - .blobs_bundle_pool - .lock() - .map_err(|error| StoreError::Custom(error.to_string()))? - .get(&tx_hash) - .cloned()) + Ok(self.blobs_bundle_pool.read(&tx_hash, |k, v| v.clone())) } /// Remove a transaction from the pool pub fn remove_transaction_from_pool(&self, hash: &H256) -> Result<(), StoreError> { if let Some(tx) = self.mempool.get(hash) { if matches!(tx.tx_type(), TxType::EIP4844) { - self.blobs_bundle_pool - .lock() - .map_err(|error| StoreError::Custom(error.to_string()))? - .remove(&tx.compute_hash()); + if let Some(blob) = self.blobs_bundle_pool.get(&tx.compute_hash()) { + blob.remove(); + } } - - self.mempool.remove(hash); + tx.remove(); }; Ok(()) From 4cf823555cde13c9f8623326abefb99fbc3a8023 Mon Sep 17 00:00:00 2001 From: Mario Rugiero Date: Thu, 20 Feb 2025 15:54:03 -0300 Subject: [PATCH 06/17] perf: log gas-per-second every 30 seconds Quick and dirty implementation: - One atomic, updated after every block; - One getter; - One Tokio task fired by the main function in `ethrex` that queries the value every 30 seconds and logs as `info`. --- cmd/ethrex/ethrex.rs | 28 ++++++++++++++++++++++++++++ crates/blockchain/blockchain.rs | 10 ++++++++++ 2 files changed, 38 insertions(+) diff --git a/cmd/ethrex/ethrex.rs b/cmd/ethrex/ethrex.rs index 62631a2680e..5d001eb2ce2 100644 --- a/cmd/ethrex/ethrex.rs +++ b/cmd/ethrex/ethrex.rs @@ -338,6 +338,34 @@ async fn main() { } } + tokio::spawn(async { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30)); + let mut since = tokio::time::Instant::now(); + let mut old_counter = ethrex_blockchain::get_gas_counter(); + loop { + let instant = interval.tick().await; + let counter = ethrex_blockchain::get_gas_counter(); + let duration = instant.duration_since(since); + + if duration.as_secs() <= 0 { + continue; + } + + // Wrapping sub is the correct behavior for up to one + // overflow of the global counter since last check. + // More would lead us to underestimate gas usage. + // Note the counter itself also wraps on overflow. + let gas_used = counter.wrapping_sub(old_counter); + + let gps = gas_used / duration.as_secs(); + + info!("[METRIC] GPS: {gps}"); + + old_counter = counter; + since = instant; + } + }); + tokio::select! { _ = tokio::signal::ctrl_c() => { info!("Server shut down started..."); diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index 84d4ece7c42..0d344a094fe 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -22,9 +22,17 @@ use ethrex_vm::db::evm_state; use ethrex_vm::EVM_BACKEND; use ethrex_vm::{backends, backends::EVM}; +use std::sync::atomic::{AtomicU64, Ordering}; + //TODO: Implement a struct Chain or BlockChain to encapsulate //functionality and canonical chain state and config +static GAS_COUNTER: AtomicU64 = AtomicU64::new(0); + +pub fn get_gas_counter() -> u64 { + GAS_COUNTER.load(Ordering::Relaxed) +} + /// Adds a new block to the store. It may or may not be canonical, as long as its ancestry links /// with the canonical chain and its parent's post-state is calculated. It doesn't modify the /// canonical chain/head. Fork choice needs to be updated for that in a separate step. @@ -77,6 +85,8 @@ pub fn add_block(block: &Block, storage: &Store) -> Result<(), ChainError> { store_block(storage, block.clone())?; store_receipts(storage, receipts, block_hash)?; + _ = GAS_COUNTER.fetch_add(block.header.gas_used, Ordering::Relaxed); + Ok(()) } From 0169dabf5cf2b48e4b9303cfb009f300b08437ae Mon Sep 17 00:00:00 2001 From: Mario Rugiero Date: Thu, 20 Feb 2025 16:20:15 -0300 Subject: [PATCH 07/17] fix: also print the time spent --- cmd/ethrex/ethrex.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/ethrex/ethrex.rs b/cmd/ethrex/ethrex.rs index 5d001eb2ce2..f9072ceef91 100644 --- a/cmd/ethrex/ethrex.rs +++ b/cmd/ethrex/ethrex.rs @@ -345,9 +345,9 @@ async fn main() { loop { let instant = interval.tick().await; let counter = ethrex_blockchain::get_gas_counter(); - let duration = instant.duration_since(since); + let duration = instant.duration_since(since).as_secs(); - if duration.as_secs() <= 0 { + if duration <= 0 { continue; } @@ -357,9 +357,9 @@ async fn main() { // Note the counter itself also wraps on overflow. let gas_used = counter.wrapping_sub(old_counter); - let gps = gas_used / duration.as_secs(); + let gps = gas_used / duration; - info!("[METRIC] GPS: {gps}"); + info!("[METRIC] GPS: {gps} INTERVAL: {duration}"); old_counter = counter; since = instant; From 9d2d6066c74889ec6e94ce670b7a024483f334a1 Mon Sep 17 00:00:00 2001 From: Mario Rugiero Date: Thu, 20 Feb 2025 17:44:10 -0300 Subject: [PATCH 08/17] fix: more info - Log throughput at each block, discounting idle time; - Log counter overflows. --- crates/blockchain/blockchain.rs | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index 0d344a094fe..a1b5a731a45 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -5,6 +5,12 @@ pub mod mempool; pub mod payload; mod smoke_test; +use std::{ + time::Instant, + sync::atomic::{AtomicU64, Ordering}, +}; +use tracing::info; + use error::{ChainError, InvalidBlockError}; use ethrex_common::constants::GAS_PER_BLOB; use ethrex_common::types::{ @@ -22,8 +28,6 @@ use ethrex_vm::db::evm_state; use ethrex_vm::EVM_BACKEND; use ethrex_vm::{backends, backends::EVM}; -use std::sync::atomic::{AtomicU64, Ordering}; - //TODO: Implement a struct Chain or BlockChain to encapsulate //functionality and canonical chain state and config @@ -39,6 +43,8 @@ pub fn get_gas_counter() -> u64 { /// /// Performs pre and post execution validation, and updates the database with the post state. pub fn add_block(block: &Block, storage: &Store) -> Result<(), ChainError> { + let since = Instant::now(); + let block_hash = block.header.compute_block_hash(); // Validate if it can be the new head and find the parent @@ -85,7 +91,19 @@ pub fn add_block(block: &Block, storage: &Store) -> Result<(), ChainError> { store_block(storage, block.clone())?; store_receipts(storage, receipts, block_hash)?; - _ = GAS_COUNTER.fetch_add(block.header.gas_used, Ordering::Relaxed); + let old_counter = GAS_COUNTER.fetch_add(block.header.gas_used, Ordering::Relaxed); + // Detect overflow, if this happens two or more times between reads they will be + // underestimated by (N-1) * u64::MAX, where N is the number of overflows in that + // period. + if old_counter.checked_add(block.header.gas_used).is_none() { + info!("GAS_COUNTER overflowed"); + } + + let interval = Instant::now().duration_since(since).as_millis(); + if interval != 0 { + let throughput = (block.header.gas_used as u128 / interval) * 1000; + info!("[METRIC] BLOCK THROUGHPUT: {throughput} Gas/s TIME SPENT: {interval} msecs"); + } Ok(()) } From 62a7321629958ea5d616fd99fde475474a6eb422 Mon Sep 17 00:00:00 2001 From: fkrause98 Date: Thu, 20 Feb 2025 18:27:16 -0300 Subject: [PATCH 09/17] feat: add measurement in build payload --- crates/blockchain/payload.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/crates/blockchain/payload.rs b/crates/blockchain/payload.rs index 596deae2f59..974090afeed 100644 --- a/crates/blockchain/payload.rs +++ b/crates/blockchain/payload.rs @@ -1,6 +1,7 @@ use std::{ cmp::{max, Ordering}, collections::HashMap, + time::Instant, }; use ethrex_common::{ @@ -237,6 +238,8 @@ pub fn build_payload( payload: &mut Block, store: &Store, ) -> Result<(BlobsBundle, U256), ChainError> { + let since = Instant::now(); + let limit: u64 = 150000000; debug!("Building payload"); let mut evm_state = evm_state(store.clone(), payload.header.parent_hash); let mut context = PayloadBuildContext::new(payload, &mut evm_state)?; @@ -244,6 +247,15 @@ pub fn build_payload( apply_withdrawals(&mut context)?; fill_transactions(&mut context)?; finalize_payload(&mut context)?; + + let interval = Instant::now().duration_since(since).as_millis(); + tracing::info!("[METRIC] BUILDING PAYLOAD TOOK: {interval} ms"); + let gas_used = limit.checked_add(context.remaining_gas).unwrap(); + if interval != 0 { + let throughput = (gas_used as u128) / interval * 1000; + tracing::info!("[METRIC] BLOCK THROUGHPUT: {throughput} Gas/s TIME SPENT: {interval} msecs"); + } + Ok((context.blobs_bundle, context.block_value)) } From 97fa5466392fd2b94d4a09fd1a6151046c110dfc Mon Sep 17 00:00:00 2001 From: fkrause98 Date: Fri, 21 Feb 2025 13:18:49 -0300 Subject: [PATCH 10/17] feat: log gas used and gas/second --- crates/blockchain/payload.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/crates/blockchain/payload.rs b/crates/blockchain/payload.rs index 974090afeed..1d203d59daa 100644 --- a/crates/blockchain/payload.rs +++ b/crates/blockchain/payload.rs @@ -1,6 +1,7 @@ use std::{ cmp::{max, Ordering}, collections::HashMap, + ops::Div, time::Instant, }; @@ -239,10 +240,11 @@ pub fn build_payload( store: &Store, ) -> Result<(BlobsBundle, U256), ChainError> { let since = Instant::now(); - let limit: u64 = 150000000; + let gas_limit = payload.header.gas_limit; debug!("Building payload"); let mut evm_state = evm_state(store.clone(), payload.header.parent_hash); let mut context = PayloadBuildContext::new(payload, &mut evm_state)?; + apply_system_operations(&mut context)?; apply_withdrawals(&mut context)?; fill_transactions(&mut context)?; @@ -250,10 +252,14 @@ pub fn build_payload( let interval = Instant::now().duration_since(since).as_millis(); tracing::info!("[METRIC] BUILDING PAYLOAD TOOK: {interval} ms"); - let gas_used = limit.checked_add(context.remaining_gas).unwrap(); + let gas_used = gas_limit.checked_add(context.remaining_gas).unwrap(); + let as_gigas = (gas_used as f64).div(10_f64.powf(9_f64)); + if interval != 0 { - let throughput = (gas_used as u128) / interval * 1000; - tracing::info!("[METRIC] BLOCK THROUGHPUT: {throughput} Gas/s TIME SPENT: {interval} msecs"); + let throughput = (as_gigas) / (interval as f64) * 1000_f64; + tracing::info!( + "[METRIC] BLOCK THROUGHPUT: {throughput} Ggas/s TIME SPENT: {interval} msecs" + ); } Ok((context.blobs_bundle, context.block_value)) From 31d01727115c0ae95be4500f0f1627b662864a41 Mon Sep 17 00:00:00 2001 From: fkrause98 Date: Fri, 21 Feb 2025 13:46:01 -0300 Subject: [PATCH 11/17] feat: only log gas throughput --- cmd/ethrex/ethrex.rs | 28 ---------------------------- crates/blockchain/blockchain.rs | 20 +++----------------- crates/blockchain/payload.rs | 3 ++- 3 files changed, 5 insertions(+), 46 deletions(-) diff --git a/cmd/ethrex/ethrex.rs b/cmd/ethrex/ethrex.rs index f9072ceef91..62631a2680e 100644 --- a/cmd/ethrex/ethrex.rs +++ b/cmd/ethrex/ethrex.rs @@ -338,34 +338,6 @@ async fn main() { } } - tokio::spawn(async { - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30)); - let mut since = tokio::time::Instant::now(); - let mut old_counter = ethrex_blockchain::get_gas_counter(); - loop { - let instant = interval.tick().await; - let counter = ethrex_blockchain::get_gas_counter(); - let duration = instant.duration_since(since).as_secs(); - - if duration <= 0 { - continue; - } - - // Wrapping sub is the correct behavior for up to one - // overflow of the global counter since last check. - // More would lead us to underestimate gas usage. - // Note the counter itself also wraps on overflow. - let gas_used = counter.wrapping_sub(old_counter); - - let gps = gas_used / duration; - - info!("[METRIC] GPS: {gps} INTERVAL: {duration}"); - - old_counter = counter; - since = instant; - } - }); - tokio::select! { _ = tokio::signal::ctrl_c() => { info!("Server shut down started..."); diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index a1b5a731a45..fce3b11e8fc 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -6,8 +6,7 @@ pub mod payload; mod smoke_test; use std::{ - time::Instant, - sync::atomic::{AtomicU64, Ordering}, + ops::Div, sync::atomic::{AtomicU64, Ordering}, time::Instant }; use tracing::info; @@ -31,12 +30,6 @@ use ethrex_vm::{backends, backends::EVM}; //TODO: Implement a struct Chain or BlockChain to encapsulate //functionality and canonical chain state and config -static GAS_COUNTER: AtomicU64 = AtomicU64::new(0); - -pub fn get_gas_counter() -> u64 { - GAS_COUNTER.load(Ordering::Relaxed) -} - /// Adds a new block to the store. It may or may not be canonical, as long as its ancestry links /// with the canonical chain and its parent's post-state is calculated. It doesn't modify the /// canonical chain/head. Fork choice needs to be updated for that in a separate step. @@ -91,17 +84,10 @@ pub fn add_block(block: &Block, storage: &Store) -> Result<(), ChainError> { store_block(storage, block.clone())?; store_receipts(storage, receipts, block_hash)?; - let old_counter = GAS_COUNTER.fetch_add(block.header.gas_used, Ordering::Relaxed); - // Detect overflow, if this happens two or more times between reads they will be - // underestimated by (N-1) * u64::MAX, where N is the number of overflows in that - // period. - if old_counter.checked_add(block.header.gas_used).is_none() { - info!("GAS_COUNTER overflowed"); - } - let interval = Instant::now().duration_since(since).as_millis(); if interval != 0 { - let throughput = (block.header.gas_used as u128 / interval) * 1000; + let as_gigas = (block.header.gas_used as f64).div(10_f64.powf(9_f64)); + let throughput = (as_gigas) / (interval as f64) * 1000_f64; info!("[METRIC] BLOCK THROUGHPUT: {throughput} Gas/s TIME SPENT: {interval} msecs"); } diff --git a/crates/blockchain/payload.rs b/crates/blockchain/payload.rs index 1d203d59daa..c245b1261dc 100644 --- a/crates/blockchain/payload.rs +++ b/crates/blockchain/payload.rs @@ -252,7 +252,7 @@ pub fn build_payload( let interval = Instant::now().duration_since(since).as_millis(); tracing::info!("[METRIC] BUILDING PAYLOAD TOOK: {interval} ms"); - let gas_used = gas_limit.checked_add(context.remaining_gas).unwrap(); + if let Some(gas_used) = gas_limit.checked_sub(context.remaining_gas) { let as_gigas = (gas_used as f64).div(10_f64.powf(9_f64)); if interval != 0 { @@ -261,6 +261,7 @@ pub fn build_payload( "[METRIC] BLOCK THROUGHPUT: {throughput} Ggas/s TIME SPENT: {interval} msecs" ); } + } Ok((context.blobs_bundle, context.block_value)) } From 9fc0e6447fca10b4ea350a5c4f83256f9ae78989 Mon Sep 17 00:00:00 2001 From: fkrause98 Date: Fri, 21 Feb 2025 13:54:26 -0300 Subject: [PATCH 12/17] lint: clippy --- crates/blockchain/blockchain.rs | 2 +- crates/storage/store/storage.rs | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index fce3b11e8fc..3d33f226c41 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -6,7 +6,7 @@ pub mod payload; mod smoke_test; use std::{ - ops::Div, sync::atomic::{AtomicU64, Ordering}, time::Instant + ops::Div, time::Instant }; use tracing::info; diff --git a/crates/storage/store/storage.rs b/crates/storage/store/storage.rs index f7a72c98c7d..a44c2e99fda 100644 --- a/crates/storage/store/storage.rs +++ b/crates/storage/store/storage.rs @@ -18,9 +18,8 @@ use ethrex_trie::{Nibbles, Trie}; use scc::HashMap as ConcurrentMap; use serde::{Deserialize, Serialize}; use sha3::{Digest as _, Keccak256}; -use std::collections::{HashMap, HashSet}; -use std::fmt::Debug; -use std::sync::{Arc, Mutex}; +use std::collections::HashMap; +use std::sync::Arc; use tracing::info; mod engines; @@ -316,7 +315,7 @@ impl Store { &self, tx_hash: H256, ) -> Result, StoreError> { - Ok(self.blobs_bundle_pool.read(&tx_hash, |k, v| v.clone())) + Ok(self.blobs_bundle_pool.read(&tx_hash, |_, v| v.clone())) } /// Remove a transaction from the pool @@ -324,10 +323,10 @@ impl Store { if let Some(tx) = self.mempool.get(hash) { if matches!(tx.tx_type(), TxType::EIP4844) { if let Some(blob) = self.blobs_bundle_pool.get(&tx.compute_hash()) { - blob.remove(); + let _removed = blob.remove(); } } - tx.remove(); + let _removed = tx.remove(); }; Ok(()) From c121b92808c9bb5406296304f31c93b04ae9b3a4 Mon Sep 17 00:00:00 2001 From: fkrause98 Date: Fri, 21 Feb 2025 14:59:10 -0300 Subject: [PATCH 13/17] lint: cargo format --- crates/blockchain/blockchain.rs | 4 +--- crates/blockchain/payload.rs | 14 +++++++------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index 3d33f226c41..b4dc04d51a1 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -5,9 +5,7 @@ pub mod mempool; pub mod payload; mod smoke_test; -use std::{ - ops::Div, time::Instant -}; +use std::{ops::Div, time::Instant}; use tracing::info; use error::{ChainError, InvalidBlockError}; diff --git a/crates/blockchain/payload.rs b/crates/blockchain/payload.rs index c245b1261dc..a8f66fdbc5d 100644 --- a/crates/blockchain/payload.rs +++ b/crates/blockchain/payload.rs @@ -253,14 +253,14 @@ pub fn build_payload( let interval = Instant::now().duration_since(since).as_millis(); tracing::info!("[METRIC] BUILDING PAYLOAD TOOK: {interval} ms"); if let Some(gas_used) = gas_limit.checked_sub(context.remaining_gas) { - let as_gigas = (gas_used as f64).div(10_f64.powf(9_f64)); + let as_gigas = (gas_used as f64).div(10_f64.powf(9_f64)); - if interval != 0 { - let throughput = (as_gigas) / (interval as f64) * 1000_f64; - tracing::info!( - "[METRIC] BLOCK THROUGHPUT: {throughput} Ggas/s TIME SPENT: {interval} msecs" - ); - } + if interval != 0 { + let throughput = (as_gigas) / (interval as f64) * 1000_f64; + tracing::info!( + "[METRIC] BLOCK THROUGHPUT: {throughput} Ggas/s TIME SPENT: {interval} msecs" + ); + } } Ok((context.blobs_bundle, context.block_value)) From a7d45e922aeadbc2df80922d5d84a5e033c41831 Mon Sep 17 00:00:00 2001 From: Javier Chatruc Date: Mon, 24 Feb 2025 12:45:57 -0300 Subject: [PATCH 14/17] Make both logs say Gigagas instead of Gas/Ggas --- crates/blockchain/blockchain.rs | 2 +- crates/blockchain/payload.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index b4dc04d51a1..8a4d30d277d 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -86,7 +86,7 @@ pub fn add_block(block: &Block, storage: &Store) -> Result<(), ChainError> { if interval != 0 { let as_gigas = (block.header.gas_used as f64).div(10_f64.powf(9_f64)); let throughput = (as_gigas) / (interval as f64) * 1000_f64; - info!("[METRIC] BLOCK THROUGHPUT: {throughput} Gas/s TIME SPENT: {interval} msecs"); + info!("[METRIC] BLOCK THROUGHPUT: {throughput} Gigagas/s TIME SPENT: {interval} msecs"); } Ok(()) diff --git a/crates/blockchain/payload.rs b/crates/blockchain/payload.rs index a8f66fdbc5d..d636622aa3d 100644 --- a/crates/blockchain/payload.rs +++ b/crates/blockchain/payload.rs @@ -258,7 +258,7 @@ pub fn build_payload( if interval != 0 { let throughput = (as_gigas) / (interval as f64) * 1000_f64; tracing::info!( - "[METRIC] BLOCK THROUGHPUT: {throughput} Ggas/s TIME SPENT: {interval} msecs" + "[METRIC] BLOCK THROUGHPUT: {throughput} Gigagas/s TIME SPENT: {interval} msecs" ); } } From f537480faecfed7fe9b7c2bb181b900db062cbfd Mon Sep 17 00:00:00 2001 From: Javier Chatruc Date: Mon, 24 Feb 2025 13:00:27 -0300 Subject: [PATCH 15/17] Show time elapsed for load test binary --- cmd/ethrex_l2/src/commands/test.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cmd/ethrex_l2/src/commands/test.rs b/cmd/ethrex_l2/src/commands/test.rs index 4912e28d671..429d037e962 100644 --- a/cmd/ethrex_l2/src/commands/test.rs +++ b/cmd/ethrex_l2/src/commands/test.rs @@ -7,6 +7,7 @@ use ethrex_l2_sdk::calldata::{self, Value}; use ethrex_rpc::clients::eth::{eth_sender::Overrides, EthClient}; use keccak_hash::keccak; use secp256k1::SecretKey; +use std::time::Instant; use std::{ fs::File, io::{self, BufRead}, @@ -206,6 +207,7 @@ impl Command { println!("Sending to: {to_address:#x}"); + let now = Instant::now(); let mut threads = vec![]; for pk in lines.map_while(Result::ok) { let thread = tokio::spawn(transfer_from( @@ -226,6 +228,8 @@ impl Command { } println!("Total retries: {retries}"); + println!("Total time elapsed: {:.2?}", now.elapsed()); + Ok(()) } } From 8cdb4cea3c3185c0e89b61dadfc9cf7518941245 Mon Sep 17 00:00:00 2001 From: Javier Chatruc Date: Mon, 24 Feb 2025 13:05:09 -0300 Subject: [PATCH 16/17] Differentiate between block building and block execution when loggin --- crates/blockchain/blockchain.rs | 2 +- crates/blockchain/payload.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index 8a4d30d277d..255665a38f5 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -86,7 +86,7 @@ pub fn add_block(block: &Block, storage: &Store) -> Result<(), ChainError> { if interval != 0 { let as_gigas = (block.header.gas_used as f64).div(10_f64.powf(9_f64)); let throughput = (as_gigas) / (interval as f64) * 1000_f64; - info!("[METRIC] BLOCK THROUGHPUT: {throughput} Gigagas/s TIME SPENT: {interval} msecs"); + info!("[METRIC] BLOCK EXECUTION THROUGHPUT: {throughput} Gigagas/s TIME SPENT: {interval} msecs"); } Ok(()) diff --git a/crates/blockchain/payload.rs b/crates/blockchain/payload.rs index d636622aa3d..69f180d73b6 100644 --- a/crates/blockchain/payload.rs +++ b/crates/blockchain/payload.rs @@ -258,7 +258,7 @@ pub fn build_payload( if interval != 0 { let throughput = (as_gigas) / (interval as f64) * 1000_f64; tracing::info!( - "[METRIC] BLOCK THROUGHPUT: {throughput} Gigagas/s TIME SPENT: {interval} msecs" + "[METRIC] BLOCK BUILDING THROUGHPUT: {throughput} Gigagas/s TIME SPENT: {interval} msecs" ); } } From e53d733f99b9f6399cb3f89c00811777ad052e6b Mon Sep 17 00:00:00 2001 From: Javier Chatruc Date: Mon, 24 Feb 2025 13:14:23 -0300 Subject: [PATCH 17/17] Revert concurrency map changes --- Cargo.lock | 5 +-- crates/storage/store/Cargo.toml | 2 +- crates/storage/store/store.rs | 75 ++++++++++++++++++++++----------- 3 files changed, 54 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca6431dbdb4..66ff2fb3d6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2849,7 +2849,6 @@ dependencies = [ "hex-literal", "libmdbx", "redb", - "scc", "serde", "serde_json", "sha3", @@ -6889,9 +6888,9 @@ dependencies = [ [[package]] name = "scc" -version = "2.3.3" +version = "2.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea091f6cac2595aa38993f04f4ee692ed43757035c36e67c180b6828356385b1" +checksum = "66b202022bb57c049555430e11fc22fea12909276a80a4c3d368da36ac1d88ed" dependencies = [ "sdd", ] diff --git a/crates/storage/store/Cargo.toml b/crates/storage/store/Cargo.toml index b28f129ee9c..fcace9632bd 100644 --- a/crates/storage/store/Cargo.toml +++ b/crates/storage/store/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" ethrex-rlp.workspace = true ethrex-common.workspace = true ethrex-trie.workspace = true -scc = "2.3.3" + ethereum-types.workspace = true anyhow = "1.0.86" bytes.workspace = true diff --git a/crates/storage/store/store.rs b/crates/storage/store/store.rs index f050d76351e..5f9ce663463 100644 --- a/crates/storage/store/store.rs +++ b/crates/storage/store/store.rs @@ -16,11 +16,11 @@ use ethrex_common::types::{ use ethrex_rlp::decode::RLPDecode; use ethrex_rlp::encode::RLPEncode; use ethrex_trie::{Nibbles, Trie}; -use scc::HashMap as ConcurrentMap; use serde::{Deserialize, Serialize}; use sha3::{Digest as _, Keccak256}; -use std::collections::HashMap; -use std::sync::Arc; +use std::collections::{HashMap, HashSet}; +use std::fmt::Debug; +use std::sync::{Arc, Mutex}; use tracing::info; /// Number of state trie segments to fetch concurrently during state sync @@ -32,8 +32,8 @@ pub const MAX_SNAPSHOT_READS: usize = 100; #[derive(Debug, Clone)] pub struct Store { engine: Arc, - pub mempool: Arc>, - pub blobs_bundle_pool: Arc>, + pub mempool: Arc>>, + pub blobs_bundle_pool: Arc>>, } #[allow(dead_code)] @@ -83,19 +83,19 @@ impl Store { #[cfg(feature = "libmdbx")] EngineType::Libmdbx => Self { engine: Arc::new(LibmdbxStore::new(path)?), - mempool: ConcurrentMap::new().into(), - blobs_bundle_pool: ConcurrentMap::new().into(), + mempool: Arc::new(Mutex::new(HashMap::new())), + blobs_bundle_pool: Arc::new(Mutex::new(HashMap::new())), }, EngineType::InMemory => Self { engine: Arc::new(InMemoryStore::new()), - mempool: ConcurrentMap::new().into(), - blobs_bundle_pool: ConcurrentMap::new().into(), + mempool: Arc::new(Mutex::new(HashMap::new())), + blobs_bundle_pool: Arc::new(Mutex::new(HashMap::new())), }, #[cfg(feature = "redb")] EngineType::RedB => Self { engine: Arc::new(RedBStore::new()?), - mempool: ConcurrentMap::new().into(), - blobs_bundle_pool: ConcurrentMap::new().into(), + mempool: Arc::new(Mutex::new(HashMap::new())), + blobs_bundle_pool: Arc::new(Mutex::new(HashMap::new())), }, }; info!("Started store engine"); @@ -290,7 +290,12 @@ impl Store { hash: H256, transaction: MempoolTransaction, ) -> Result<(), StoreError> { - self.mempool.entry(hash).or_insert(transaction); + let mut mempool = self + .mempool + .lock() + .map_err(|error| StoreError::Custom(error.to_string()))?; + mempool.insert(hash, transaction); + Ok(()) } @@ -301,8 +306,9 @@ impl Store { blobs_bundle: BlobsBundle, ) -> Result<(), StoreError> { self.blobs_bundle_pool - .entry(tx_hash) - .or_insert(blobs_bundle); + .lock() + .map_err(|error| StoreError::Custom(error.to_string()))? + .insert(tx_hash, blobs_bundle); Ok(()) } @@ -311,18 +317,29 @@ impl Store { &self, tx_hash: H256, ) -> Result, StoreError> { - Ok(self.blobs_bundle_pool.read(&tx_hash, |_, v| v.clone())) + Ok(self + .blobs_bundle_pool + .lock() + .map_err(|error| StoreError::Custom(error.to_string()))? + .get(&tx_hash) + .cloned()) } /// Remove a transaction from the pool pub fn remove_transaction_from_pool(&self, hash: &H256) -> Result<(), StoreError> { - if let Some(tx) = self.mempool.get(hash) { + let mut mempool = self + .mempool + .lock() + .map_err(|error| StoreError::Custom(error.to_string()))?; + if let Some(tx) = mempool.get(hash) { if matches!(tx.tx_type(), TxType::EIP4844) { - if let Some(blob) = self.blobs_bundle_pool.get(&tx.compute_hash()) { - let _removed = blob.remove(); - } + self.blobs_bundle_pool + .lock() + .map_err(|error| StoreError::Custom(error.to_string()))? + .remove(&tx.compute_hash()); } - let _removed = tx.remove(); + + mempool.remove(hash); }; Ok(()) @@ -335,17 +352,21 @@ impl Store { filter: &dyn Fn(&Transaction) -> bool, ) -> Result>, StoreError> { let mut txs_by_sender: HashMap> = HashMap::new(); - self.mempool.scan(|_, tx| { + let mempool = self + .mempool + .lock() + .map_err(|error| StoreError::Custom(error.to_string()))?; + + for (_, tx) in mempool.iter() { if filter(tx) { txs_by_sender .entry(tx.sender()) .or_default() .push(tx.clone()) } - }); + } txs_by_sender.iter_mut().for_each(|(_, txs)| txs.sort()); - Ok(txs_by_sender) } @@ -354,9 +375,15 @@ impl Store { &self, possible_hashes: &[H256], ) -> Result, StoreError> { + let mempool = self + .mempool + .lock() + .map_err(|error| StoreError::Custom(error.to_string()))?; + + let tx_set: HashSet<_> = mempool.iter().map(|(hash, _)| hash).collect(); Ok(possible_hashes .iter() - .filter(|hash| self.mempool.get(*hash).is_none()) + .filter(|hash| !tx_set.contains(hash)) .copied() .collect()) }