From 9dab173f42d625b230b5a9d44a55fa8a6382e023 Mon Sep 17 00:00:00 2001 From: Zura Benashvili Date: Mon, 16 Dec 2024 11:57:40 +0400 Subject: [PATCH 1/4] refactor(snark/block_verify): spawn thread only once and send verification requests there instead of spawning a new thread on every verification request --- node/common/src/service/builder.rs | 5 +- node/common/src/service/service.rs | 4 ++ node/common/src/service/snarks.rs | 75 ++++++++++++++++++++++-------- 3 files changed, 64 insertions(+), 20 deletions(-) diff --git a/node/common/src/service/builder.rs b/node/common/src/service/builder.rs index 6b724ae3c3..734f2c05b7 100644 --- a/node/common/src/service/builder.rs +++ b/node/common/src/service/builder.rs @@ -125,8 +125,11 @@ impl NodeServiceCommonBuilder { .chain(b"static") .finalize_xof(), rng: self.rng, - event_sender: self.event_sender, + event_sender: self.event_sender.clone(), event_receiver: self.event_receiver, + snark_block_proof_verify: NodeService::snark_block_proof_verifier_spawn( + self.event_sender, + ), ledger_manager, block_producer: self.block_producer, p2p, diff --git a/node/common/src/service/service.rs b/node/common/src/service/service.rs index a52a4e9255..79265d0c4a 100644 --- a/node/common/src/service/service.rs +++ b/node/common/src/service/service.rs @@ -22,6 +22,7 @@ use super::{ p2p::webrtc_with_libp2p::P2pServiceCtx, replay::ReplayerState, rpc::{RpcSender, RpcService}, + snarks::SnarkBlockVerifyArgs, EventReceiver, EventSender, }; @@ -36,6 +37,8 @@ pub struct NodeService { pub event_sender: EventSender, pub event_receiver: EventReceiver, + pub snark_block_proof_verify: mpsc::UnboundedSender, + pub ledger_manager: LedgerManager, pub block_producer: Option, pub p2p: P2pServiceCtx, @@ -105,6 +108,7 @@ impl NodeService { rng: StdRng::from_seed(rng_seed), event_sender: mpsc::unbounded_channel().0, event_receiver: mpsc::unbounded_channel().1.into(), + snark_block_proof_verify: mpsc::unbounded_channel().0, ledger_manager: LedgerManager::spawn(Default::default()), block_producer: None, p2p: P2pServiceCtx::mocked(p2p_sec_key), diff --git a/node/common/src/service/snarks.rs b/node/common/src/service/snarks.rs index b839d25c27..7d2d496c84 100644 --- a/node/common/src/service/snarks.rs +++ b/node/common/src/service/snarks.rs @@ -11,6 +11,7 @@ use ledger::{ use mina_p2p_messages::v2; use node::{ core::{ + channels::mpsc, snark::{Snark, SnarkJobId}, thread, }, @@ -24,6 +25,54 @@ use rand::prelude::*; use crate::NodeService; +use super::EventSender; + +pub struct SnarkBlockVerifyArgs { + pub req_id: SnarkBlockVerifyId, + pub verifier_index: BlockVerifier, + pub verifier_srs: Arc, + pub block: VerifiableBlockWithHash, +} + +impl NodeService { + pub fn snark_block_proof_verifier_spawn( + event_sender: EventSender, + ) -> mpsc::UnboundedSender { + let (tx, mut rx) = mpsc::unbounded_channel(); + thread::Builder::new() + .name("block_proof_verifier".to_owned()) + .spawn(move || { + while let Some(SnarkBlockVerifyArgs { + req_id, + verifier_index, + verifier_srs, + block, + }) = rx.blocking_recv() + { + eprintln!("verify({}) - start", block.hash_ref()); + let header = block.header_ref(); + let result = { + if !ledger::proofs::verification::verify_block( + header, + &verifier_index, + &verifier_srs, + ) { + Err(SnarkBlockVerifyError::VerificationFailed) + } else { + Ok(()) + } + }; + eprintln!("verify({}) - end", block.hash_ref()); + + let _ = event_sender.send(SnarkEvent::BlockVerify(req_id, result).into()); + } + }) + .expect("failed to spawn block_proof_verifier thread"); + + tx + } +} + impl node::service::SnarkBlockVerifyService for NodeService { fn verify_init( &mut self, @@ -35,25 +84,13 @@ impl node::service::SnarkBlockVerifyService for NodeService { if self.replayer.is_some() { return; } - let tx = self.event_sender().clone(); - thread::spawn(move || { - eprintln!("verify({}) - start", block.hash_ref()); - let header = block.header_ref(); - let result = { - if !ledger::proofs::verification::verify_block( - header, - &verifier_index, - &verifier_srs, - ) { - Err(SnarkBlockVerifyError::VerificationFailed) - } else { - Ok(()) - } - }; - eprintln!("verify({}) - end", block.hash_ref()); - - let _ = tx.send(SnarkEvent::BlockVerify(req_id, result).into()); - }); + let args = SnarkBlockVerifyArgs { + req_id, + verifier_index, + verifier_srs, + block, + }; + let _ = self.snark_block_proof_verify.send(args); } } From 6b89bf098580b59e0569394e1f6e889d51a67076 Mon Sep 17 00:00:00 2001 From: Zura Benashvili Date: Mon, 16 Dec 2024 13:17:53 +0400 Subject: [PATCH 2/4] feat(node/block_producer): spawn prover thread only once instead of per request --- node/common/src/service/block_producer/mod.rs | 74 +++++++++++++------ 1 file changed, 53 insertions(+), 21 deletions(-) diff --git a/node/common/src/service/block_producer/mod.rs b/node/common/src/service/block_producer/mod.rs index 748fe4983c..ed3d1d7036 100644 --- a/node/common/src/service/block_producer/mod.rs +++ b/node/common/src/service/block_producer/mod.rs @@ -21,18 +21,29 @@ pub struct BlockProducerService { provers: Option, keypair: AccountSecretKey, vrf_evaluation_sender: mpsc::UnboundedSender, + prove_sender: mpsc::UnboundedSender<( + BlockProver, + StateHash, + Box, + )>, } impl BlockProducerService { pub fn new( keypair: AccountSecretKey, vrf_evaluation_sender: mpsc::UnboundedSender, + prove_sender: mpsc::UnboundedSender<( + BlockProver, + StateHash, + Box, + )>, provers: Option, ) -> Self { Self { provers, keypair, vrf_evaluation_sender, + prove_sender, } } @@ -41,22 +52,29 @@ impl BlockProducerService { keypair: AccountSecretKey, provers: Option, ) -> Self { - let (vrf_evaluation_sender, vrf_evaluation_receiver) = - mpsc::unbounded_channel::(); + let (vrf_evaluation_sender, vrf_evaluation_receiver) = mpsc::unbounded_channel(); + let (prove_sender, prove_receiver) = mpsc::unbounded_channel(); + let event_sender_clone = event_sender.clone(); let producer_keypair = keypair.clone(); thread::Builder::new() .name("openmina_vrf_evaluator".to_owned()) .spawn(move || { vrf_evaluator::vrf_evaluator( - event_sender, + event_sender_clone, vrf_evaluation_receiver, producer_keypair.into(), ); }) .unwrap(); - BlockProducerService::new(keypair, vrf_evaluation_sender, provers) + let producer_keypair = keypair.clone(); + thread::Builder::new() + .name("openmina_block_prover".to_owned()) + .spawn(move || prover_loop(producer_keypair, event_sender, prove_receiver)) + .unwrap(); + + BlockProducerService::new(keypair, vrf_evaluation_sender, prove_sender, provers) } pub fn keypair(&self) -> AccountSecretKey { @@ -64,6 +82,31 @@ impl BlockProducerService { } } +fn prover_loop( + keypair: AccountSecretKey, + event_sender: EventSender, + mut rx: mpsc::UnboundedReceiver<( + BlockProver, + StateHash, + Box, + )>, +) { + while let Some((provers, block_hash, input)) = rx.blocking_recv() { + let res = + prove(provers, input.clone(), keypair.clone(), false).map_err(|err| format!("{err:?}")); + if res.is_err() { + // IMPORTANT: Make sure that `input` here is a copy from before `prove` is called, we don't + // want to leak the private key. + if let Err(error) = dump_failed_block_proof_input(block_hash.clone(), input) { + openmina_core::error!( + openmina_core::log::system_time(); + message = "Failure when dumping failed block proof inputs", error = format!("{error}")); + } + } + let _ = event_sender.send(BlockProducerEvent::BlockProve(block_hash, res).into()); + } +} + pub fn prove( provers: BlockProver, mut input: Box, @@ -113,23 +156,12 @@ impl node::service::BlockProducerService for crate::NodeService { return; } let provers = self.provers(); - let keypair = self.block_producer.as_ref().unwrap().keypair(); - - let tx = self.event_sender().clone(); - thread::spawn(move || { - let res = - prove(provers, input.clone(), keypair, false).map_err(|err| format!("{err:?}")); - if res.is_err() { - // IMPORTANT: Make sure that `input` here is a copy from before `prove` is called, we don't - // want to leak the private key. - if let Err(error) = dump_failed_block_proof_input(block_hash.clone(), input) { - openmina_core::error!( - openmina_core::log::system_time(); - message = "Failure when dumping failed block proof inputs", error = format!("{error}")); - } - } - let _ = tx.send(BlockProducerEvent::BlockProve(block_hash, res).into()); - }); + let _ = self + .block_producer + .as_ref() + .expect("prove shouldn't be requested if block producer isn't initialized") + .prove_sender + .send((provers, block_hash, input)); } } From 5e828c24aced5b0a50ff1b758d3d905ee2b7e9b7 Mon Sep 17 00:00:00 2001 From: Zura Benashvili Date: Mon, 16 Dec 2024 13:21:45 +0400 Subject: [PATCH 3/4] feat(dep/wasm-bindgen): update to 0.2.99 --- .github/workflows/ci.yaml | 2 +- Cargo.lock | 35 ++++++++++++++++++----------------- core/Cargo.toml | 4 ++-- node/common/Cargo.toml | 2 +- p2p/Cargo.toml | 2 +- 5 files changed, 23 insertions(+), 22 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 45bf246eae..68b954677c 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -191,7 +191,7 @@ jobs: rustup default nightly rustup component add rustfmt rust-src rustup target add wasm32-unknown-unknown - cargo install -f wasm-bindgen-cli --version 0.2.95 + cargo install -f wasm-bindgen-cli --version 0.2.99 - name: Setup Rust Cache uses: Swatinem/rust-cache@v2 diff --git a/Cargo.lock b/Cargo.lock index 31ae389f5a..e57e64caac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3087,10 +3087,11 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.72" +version = "0.3.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" +checksum = "6717b6b5b077764fb5966237269cb3c64edddde4b14ce42647430a78ced9e7b7" dependencies = [ + "once_cell", "wasm-bindgen", ] @@ -8149,9 +8150,9 @@ checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" [[package]] name = "wasm-bindgen" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" +checksum = "a474f6281d1d70c17ae7aa6a613c87fce69a127e2624002df63dcb39d6cf6396" dependencies = [ "cfg-if", "once_cell", @@ -8160,13 +8161,12 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" +checksum = "5f89bb38646b4f81674e8f5c3fb81b562be1fd936d84320f3264486418519c79" dependencies = [ "bumpalo", "log", - "once_cell", "proc-macro2", "quote", "syn 2.0.58", @@ -8175,21 +8175,22 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.43" +version = "0.4.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e9300f63a621e96ed275155c108eb6f843b6a26d053f122ab69724559dc8ed" +checksum = "38176d9b44ea84e9184eff0bc34cc167ed044f816accfe5922e54d84cf48eca2" dependencies = [ "cfg-if", "js-sys", + "once_cell", "wasm-bindgen", "web-sys", ] [[package]] name = "wasm-bindgen-macro" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" +checksum = "2cc6181fd9a7492eef6fef1f33961e3695e4579b9872a6f7c83aee556666d4fe" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -8197,9 +8198,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" +checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2" dependencies = [ "proc-macro2", "quote", @@ -8210,9 +8211,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" +checksum = "943aab3fdaaa029a6e0271b35ea10b72b943135afe9bffca82384098ad0e06a6" [[package]] name = "wasm-bindgen-test" @@ -8266,9 +8267,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.72" +version = "0.3.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112" +checksum = "04dd7223427d52553d3702c004d3b2fe07c148165faa56313cb00211e31c12bc" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/core/Cargo.toml b/core/Cargo.toml index 9976af2bbd..f1b24df803 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -38,8 +38,8 @@ ark-ff = { workspace = true } redux = { workspace = true, features=["serializable_callbacks"] } [target.'cfg(target_family = "wasm")'.dependencies] -wasm-bindgen = "0.2" -wasm-bindgen-futures = "0.4" +wasm-bindgen = "0.2.99" +wasm-bindgen-futures = "0.4.49" wasm_thread = { version = "0.3", features = [ "es_modules" ] } js-sys = "0.3" web-sys = { version = "0.3", features = ["Window", "Response"] } diff --git a/node/common/Cargo.toml b/node/common/Cargo.toml index e26483f77a..2fbc2f1004 100644 --- a/node/common/Cargo.toml +++ b/node/common/Cargo.toml @@ -26,7 +26,7 @@ openmina-core = { path = "../../core" } [target.'cfg(target_family = "wasm")'.dependencies] redux = { workspace = true } wasm-bindgen = "0.2" -wasm-bindgen-futures = "0.4.42" +wasm-bindgen-futures = "0.4" gloo-timers = { version = "0.3", features = ["futures"] } gloo-utils = "0.2" tracing-wasm = "0.2" diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index e93fff2b7e..fb7d8ec6c4 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -77,7 +77,7 @@ local-ip-address = "0.6.1" [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen = "0.2" -wasm-bindgen-futures = "0.4.42" +wasm-bindgen-futures = "0.4" gloo-timers = { version = "0.3", features = ["futures"] } gloo-utils = "0.2" js-sys = "0.3.64" From af05e926b1138c83705d758ed900867a97da1a5e Mon Sep 17 00:00:00 2001 From: Zura Benashvili Date: Mon, 16 Dec 2024 13:40:14 +0400 Subject: [PATCH 4/4] feat(node/web/rpc): actions stats --- node/common/src/service/rpc/stats.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/node/common/src/service/rpc/stats.rs b/node/common/src/service/rpc/stats.rs index 4422810ca1..18256da2ee 100644 --- a/node/common/src/service/rpc/stats.rs +++ b/node/common/src/service/rpc/stats.rs @@ -23,6 +23,24 @@ impl Stats { #[cfg(target_family = "wasm")] #[cfg_attr(target_family = "wasm", wasm_bindgen)] impl Stats { + pub async fn actions(&self, id: JsValue) -> Result { + let query = if id.is_null() { + ActionStatsQuery::SinceStart + } else if id.as_string().is_some_and(|s| s == "latest") { + ActionStatsQuery::ForLatestBlock + } else { + let id = id.into_serde().map_err(|err| err.to_string())?; + ActionStatsQuery::ForBlockWithId(id) + }; + + let res = self + .sender + .oneshot_request::(RpcRequest::ActionStatsGet(query)) + .await + .flatten(); + Ok(JsValue::from_serde(&res).unwrap_or_default()) + } + pub async fn sync(&self, limit: Option) -> JsValue { let query = SyncStatsQuery { limit }; let res = self