From 5f915d14fee51e10ff2c4983f224d8cf1ec588c3 Mon Sep 17 00:00:00 2001 From: Daniel Kuehr Date: Sat, 27 Apr 2024 13:22:00 -0400 Subject: [PATCH 1/4] Replace bincode serialization for cbor --- Cargo.lock | 43 ++++++++++++++++++++++++++++++++---- Cargo.toml | 1 + ledger/Cargo.toml | 2 +- ledger/src/proofs/caching.rs | 17 ++++---------- node/Cargo.toml | 4 ++-- node/account/Cargo.toml | 1 - node/src/recorder/mod.rs | 20 ++++++++--------- node/testing/Cargo.toml | 3 +-- snark/Cargo.toml | 2 +- 9 files changed, 59 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 71698dfc4e..9dfc866a75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2198,6 +2198,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "gensym" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "913dce4c5f06c2ea40fc178c06f777ac89fc6b1383e90c254fafb1abe4ba3c82" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.58", + "uuid 1.5.0", +] + [[package]] name = "getopts" version = "0.2.21" @@ -3353,6 +3365,26 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" +[[package]] +name = "linkme" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb2cfee0de9bd869589fb9a015e155946d1be5ff415cb844c2caccc6cc4b5db9" +dependencies = [ + "linkme-impl", +] + +[[package]] +name = "linkme-impl" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adf157a4dc5a29b7b464aa8fe7edeff30076e07e13646a1c3874f58477dc99f8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.58", +] + [[package]] name = "linux-raw-sys" version = "0.3.8" @@ -3602,7 +3634,6 @@ dependencies = [ "ark-serialize", "backtrace", "base64 0.13.1", - "bincode", "bitvec", "blake2", "bs58 0.4.0", @@ -3631,6 +3662,7 @@ dependencies = [ "rand_seeder", "rayon", "serde", + "serde_cbor", "serde_json", "serde_with 3.7.0", "sha2 0.10.8", @@ -3923,9 +3955,9 @@ name = "node" version = "0.4.0" dependencies = [ "anyhow", - "bincode", "derive_more", "lazy_static", + "linkme", "mina-hasher", "mina-p2p-messages", "mina-signer", @@ -3938,6 +3970,7 @@ dependencies = [ "regex", "rust-format", "serde", + "serde_cbor", "serde_json", "serde_with 3.7.0", "snark", @@ -4299,7 +4332,6 @@ dependencies = [ "anyhow", "argon2", "base64 0.22.0", - "bincode", "bs58 0.4.0", "crypto_secretbox", "hex", @@ -4363,7 +4395,6 @@ version = "0.4.0" dependencies = [ "anyhow", "axum", - "bincode", "chrono", "clap 4.5.2", "console", @@ -5231,6 +5262,9 @@ version = "0.1.0" source = "git+https://github.com/openmina/redux-rs.git?branch=feat/enabling-condition-with-time#9826c5223104acdd547c36c27966a4be89b34b17" dependencies = [ "enum_dispatch", + "gensym", + "linkme", + "paste", "serde", "wasm-timer", ] @@ -5969,6 +6003,7 @@ dependencies = [ "derive_more", "hex", "kimchi", + "linkme", "mina-curves", "mina-hasher", "mina-p2p-messages", diff --git a/Cargo.toml b/Cargo.toml index a93d7817ea..c87ce38e91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ redux = { git = "https://github.com/openmina/redux-rs.git", branch = "feat/enabl serde = "1.0.190" serde_json = "1.0.107" serde_with = { version = "3.7.0", features = ["hex"] } +linkme = "0.3.22" [profile.fuzz] inherits = "release" diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index 132379e568..c78a52a9f3 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -59,7 +59,7 @@ uuid = { version = "1", features = [ "v4" ] } serde = { version = "1.0", features = ["rc"] } serde_json = { version = "1.0", features = ["float_roundtrip"] } -bincode = "1.3.3" +serde_cbor = "0.11.2" backtrace = "0.3" derive_more = "0.99.17" diff --git a/ledger/src/proofs/caching.rs b/ledger/src/proofs/caching.rs index b195931224..e09eddb737 100644 --- a/ledger/src/proofs/caching.rs +++ b/ledger/src/proofs/caching.rs @@ -413,17 +413,12 @@ impl From<&VerifierIndexCached> for VerifierIndex { } pub fn verifier_index_to_bytes(verifier: &VerifierIndex) -> Vec { - const NBYTES: usize = 5328359; - let verifier: VerifierIndexCached = verifier.into(); - let mut bytes = Vec::with_capacity(NBYTES); - bincode::serialize_into(&mut bytes, &verifier).unwrap(); - - bytes + serde_cbor::to_vec(&verifier).unwrap() } pub fn verifier_index_from_bytes(bytes: &[u8]) -> VerifierIndex { - let verifier: VerifierIndexCached = bincode::deserialize(bytes).unwrap(); + let verifier: VerifierIndexCached = serde_cbor::from_slice(bytes).unwrap(); (&verifier).into() } @@ -434,13 +429,9 @@ where BigInt: From<&'a ::ScalarField>, BigInt: From<&'a ::BaseField>, { - const NBYTES: usize = 5308593; - let srs: SRSCached = srs.into(); - let mut bytes = Vec::with_capacity(NBYTES); - bincode::serialize_into(&mut bytes, &srs).unwrap(); - bytes + serde_cbor::to_vec(&srs).unwrap() } pub fn srs_from_bytes(bytes: &[u8]) -> SRS @@ -448,6 +439,6 @@ where G: CommitmentCurve, G: for<'a> From<&'a GroupAffineCached>, { - let srs: SRSCached = bincode::deserialize(bytes).unwrap(); + let srs: SRSCached = serde_cbor::from_slice(bytes).unwrap(); (&srs).into() } diff --git a/node/Cargo.toml b/node/Cargo.toml index cfb1cd8a63..13fa09ba50 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -16,19 +16,19 @@ thiserror = "1.0.37" time = { version = "0.3.36", features = ["parsing"] } anyhow = "1.0.70" num_enum = "0.5.7" -bincode = "1.3.3" redux = { workspace = true } mina-hasher = { workspace = true } mina-signer = { workspace = true } ledger = { workspace = true } mina-p2p-messages = { workspace = true } vrf = { workspace = true } - +linkme = "0.3.22" openmina-core = { path = "../core" } snark = { path = "../snark" } p2p = { path = "../p2p" } openmina-node-account = { path = "./account" } tokio = { version = "1.26.0" } +serde_cbor = "0.11.2" [build-dependencies] regex = "1" diff --git a/node/account/Cargo.toml b/node/account/Cargo.toml index 9225fbae88..3a719e1dbc 100644 --- a/node/account/Cargo.toml +++ b/node/account/Cargo.toml @@ -10,7 +10,6 @@ serde_json = { version = "1.0.82", features = ["unbounded_depth", "arbitrary_pre thiserror = "1.0.37" anyhow = "1.0.70" bs58 = "0.4.0" -bincode = "1.3.3" hex = "0.4.3" rand = "0.8" lazy_static = "1.4.0" diff --git a/node/src/recorder/mod.rs b/node/src/recorder/mod.rs index 7e2c928e5d..1db6c11dce 100644 --- a/node/src/recorder/mod.rs +++ b/node/src/recorder/mod.rs @@ -15,12 +15,12 @@ use serde::{Deserialize, Serialize}; use crate::{Action, ActionKind, ActionWithMeta, State}; fn initial_state_path>(path: P) -> PathBuf { - path.as_ref().join("initial_state.bincode") + path.as_ref().join("initial_state.cbor") } fn actions_path>(path: P, file_index: usize) -> PathBuf { path.as_ref() - .join(format!("actions_{}.bincode", file_index)) + .join(format!("actions_{}.cbor", file_index)) } #[derive(Serialize, Deserialize)] @@ -30,12 +30,12 @@ pub struct RecordedInitialState<'a> { } impl<'a> RecordedInitialState<'a> { - pub fn write_to(&self, writer: &mut W) -> bincode::Result<()> { - bincode::serialize_into(writer, self) + pub fn write_to(&self, writer: &mut W) -> serde_cbor::Result<()> { + serde_cbor::to_writer(writer, self) } - pub fn decode(encoded: &[u8]) -> bincode::Result { - bincode::deserialize(encoded) + pub fn decode(encoded: &[u8]) -> serde_cbor::Result { + serde_cbor::from_slice(encoded) } } @@ -47,12 +47,12 @@ pub struct RecordedActionWithMeta<'a> { } impl<'a> RecordedActionWithMeta<'a> { - pub fn encode(&self) -> bincode::Result> { - bincode::serialize(self) + pub fn encode(&self) -> serde_cbor::Result> { + serde_cbor::to_vec(self) } - pub fn decode(encoded: &[u8]) -> bincode::Result { - bincode::deserialize(encoded) + pub fn decode(encoded: &[u8]) -> serde_cbor::Result { + serde_cbor::from_slice(encoded) } pub fn as_action_with_meta(self) -> Result { diff --git a/node/testing/Cargo.toml b/node/testing/Cargo.toml index 3d29e3d890..88e03cdbf7 100644 --- a/node/testing/Cargo.toml +++ b/node/testing/Cargo.toml @@ -18,9 +18,8 @@ derive_more = "0.99.17" serde = "1.0.147" serde_json = { version = "1.0.82", features = ["unbounded_depth", "arbitrary_precision"] } thiserror = "1.0.37" -serde_cbor = "0.11.2" anyhow = "1.0.70" -bincode = "1.3.3" +serde_cbor = "0.11.2" rand = "0.8" tokio = { version = "1.26.0" } num_cpus = "1.0" diff --git a/snark/Cargo.toml b/snark/Cargo.toml index 0bbb8c7d34..594c061c31 100644 --- a/snark/Cargo.toml +++ b/snark/Cargo.toml @@ -29,7 +29,7 @@ sha2 = "0.10" num-bigint = "0.4" bincode = "1.3.3" thiserror = "1.0.50" - +linkme = "0.3.22" rand = "0.8" rayon = "1.5" From 4ca447b310191f2bcb093ba5b1e1fd33c58b84fb Mon Sep 17 00:00:00 2001 From: Daniel Kuehr Date: Tue, 30 Apr 2024 14:21:14 -0400 Subject: [PATCH 2/4] testing: replace getrandom calls in CryptoService --- Cargo.lock | 15 --------------- node/native/src/service.rs | 14 ++++++++------ 2 files changed, 8 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9dfc866a75..e8be9c418b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2198,18 +2198,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "gensym" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "913dce4c5f06c2ea40fc178c06f777ac89fc6b1383e90c254fafb1abe4ba3c82" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.58", - "uuid 1.5.0", -] - [[package]] name = "getopts" version = "0.2.21" @@ -5262,9 +5250,6 @@ version = "0.1.0" source = "git+https://github.com/openmina/redux-rs.git?branch=feat/enabling-condition-with-time#9826c5223104acdd547c36c27966a4be89b34b17" dependencies = [ "enum_dispatch", - "gensym", - "linkme", - "paste", "serde", "wasm-timer", ] diff --git a/node/native/src/service.rs b/node/native/src/service.rs index 9c85f42131..96cd3d03d5 100644 --- a/node/native/src/service.rs +++ b/node/native/src/service.rs @@ -116,17 +116,19 @@ impl P2pCryptoService for NodeService { fn ephemeral_sk(&mut self) -> [u8; 32] { // TODO: make deterministic // TODO: make network debugger to use seed to derive the same key - let mut r = [0; 32]; - getrandom::getrandom(&mut r).unwrap(); - r + //let mut r = [0; 32]; + //getrandom::getrandom(&mut r).unwrap(); + //r + self.rng.gen() } fn static_sk(&mut self) -> [u8; 32] { // TODO: make deterministic // TODO: make network debugger to use seed to derive the same key - let mut r = [0; 32]; - getrandom::getrandom(&mut r).unwrap(); - r + //let mut r = [0; 32]; + //getrandom::getrandom(&mut r).unwrap(); + //r + self.rng.gen() } fn sign_key(&mut self, key: &[u8; 32]) -> Vec { From 061c1c52dc1d20703ca8a02dba4d02419b61640b Mon Sep 17 00:00:00 2001 From: Daniel Kuehr Date: Thu, 2 May 2024 09:02:15 -0400 Subject: [PATCH 3/4] replay: fix invariant checks --- core/src/block/block_with_hash.rs | 6 +++--- .../only_syncs_to_better_blocks.rs | 14 ++++++++------ 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/block/block_with_hash.rs b/core/src/block/block_with_hash.rs index 7b559e1d9b..bfb26e320f 100644 --- a/core/src/block/block_with_hash.rs +++ b/core/src/block/block_with_hash.rs @@ -109,11 +109,11 @@ impl> BlockWithHash { } pub fn is_genesis(&self) -> bool { - self.height() - == CONSTRAINT_CONSTANTS + self.height() == 1 + || CONSTRAINT_CONSTANTS .fork .as_ref() - .map_or(1, |fork| fork.previous_length + 1) + .map_or(false, |fork| fork.previous_length + 1 == self.height()) } pub fn root_block_height(&self) -> u32 { diff --git a/node/invariants/src/transition_frontier/only_syncs_to_better_blocks.rs b/node/invariants/src/transition_frontier/only_syncs_to_better_blocks.rs index 6be8a1712d..548d045896 100644 --- a/node/invariants/src/transition_frontier/only_syncs_to_better_blocks.rs +++ b/node/invariants/src/transition_frontier/only_syncs_to_better_blocks.rs @@ -60,12 +60,14 @@ impl Invariant for TransitionFrontierOnlySyncsToBetterBlocks { match (target_best_tip, best_tip) { (Some(target_best_tip), Some(best_tip)) => { checked = true; - if !consensus_take( - best_tip.consensus_state(), - target_best_tip.consensus_state(), - best_tip.hash(), - target_best_tip.hash(), - ) { + if !best_tip.is_genesis() + && !consensus_take( + best_tip.consensus_state(), + target_best_tip.consensus_state(), + best_tip.hash(), + target_best_tip.hash(), + ) + { return InvariantResult::Violation(format!("best tip target not better than current best tip!\nprev({}): {}\nnew({}): {}", best_tip.hash(), serde_json::to_string(best_tip.consensus_state()).unwrap(), From 655635d067859b5ec33396814d80e3d0dbb7bbde Mon Sep 17 00:00:00 2001 From: Zura Benashvili Date: Tue, 14 May 2024 11:20:14 +0400 Subject: [PATCH 4/4] fix(replayer): non-determinism because of race condition caused by async staged ledger reconstruction re: #52 --- node/native/src/service.rs | 4 ++ node/src/ledger/ledger_manager.rs | 34 +++++++--- node/src/ledger/ledger_service.rs | 109 ++++++++++++++++++++++-------- node/src/recorder/mod.rs | 3 +- 4 files changed, 108 insertions(+), 42 deletions(-) diff --git a/node/native/src/service.rs b/node/native/src/service.rs index 96cd3d03d5..d2cfe7f2a1 100644 --- a/node/native/src/service.rs +++ b/node/native/src/service.rs @@ -85,6 +85,10 @@ impl LedgerService for NodeService { fn ledger_manager(&self) -> &LedgerManager { &self.ledger_manager } + + fn force_sync_calls(&self) -> bool { + self.replayer.is_some() + } } impl redux::TimeService for NodeService { diff --git a/node/src/ledger/ledger_manager.rs b/node/src/ledger/ledger_manager.rs index 673f0eecc2..a4cfaedbb8 100644 --- a/node/src/ledger/ledger_manager.rs +++ b/node/src/ledger/ledger_manager.rs @@ -71,22 +71,36 @@ pub enum LedgerResponse { } impl LedgerRequest { - fn handle(self, ledger_ctx: &mut LedgerCtx, caller: &LedgerCaller) -> LedgerResponse { + fn handle( + self, + ledger_ctx: &mut LedgerCtx, + caller: &LedgerCaller, + force_sync: bool, + ) -> LedgerResponse { match self { Self::Write(request) => LedgerResponse::Write(match request { LedgerWriteRequest::StagedLedgerReconstruct { snarked_ledger_hash, parts, } => { - let caller = caller.clone(); - let cb = move |staged_ledger_hash, result| { - caller.call(LedgerRequest::StagedLedgerReconstructResult { + if !force_sync { + let caller = caller.clone(); + let cb = move |staged_ledger_hash, result| { + caller.call(LedgerRequest::StagedLedgerReconstructResult { + staged_ledger_hash, + result, + }) + }; + ledger_ctx.staged_ledger_reconstruct(snarked_ledger_hash, parts, cb); + return LedgerResponse::Success; + } else { + let (staged_ledger_hash, result) = + ledger_ctx.staged_ledger_reconstruct_sync(snarked_ledger_hash, parts); + LedgerWriteResponse::StagedLedgerReconstruct { staged_ledger_hash, result, - }) - }; - ledger_ctx.staged_ledger_reconstruct(snarked_ledger_hash, parts, cb); - return LedgerResponse::Success; + } + } } LedgerWriteRequest::StagedLedgerDiffCreate { pred_block, @@ -275,7 +289,7 @@ impl LedgerManager { let join_handle = thread::spawn(move || { while let Some(LedgerRequestWithChan { request, responder }) = receiver.blocking_recv() { - let response = request.handle(&mut ledger_ctx, &ledger_caller); + let response = request.handle(&mut ledger_ctx, &ledger_caller, responder.is_some()); match (response, responder) { (LedgerResponse::Write(resp), None) => { ledger_ctx.send_write_response(resp); @@ -309,7 +323,7 @@ impl LedgerManager { self.caller.call(request) } - fn call_sync( + pub(super) fn call_sync( &self, request: LedgerRequest, ) -> Result { diff --git a/node/src/ledger/ledger_service.rs b/node/src/ledger/ledger_service.rs index a131228745..fa20b7c770 100644 --- a/node/src/ledger/ledger_service.rs +++ b/node/src/ledger/ledger_service.rs @@ -456,39 +456,40 @@ impl LedgerCtx { ) where F: 'static + FnOnce(v2::LedgerHash, Result) + Send, { - let staged_ledger_hash = parts - .as_ref() - .map(|p| p.staged_ledger_hash.clone()) - .unwrap_or_else(|| snarked_ledger_hash.clone()); - let snarked_ledger = self.sync.snarked_ledger_mut(snarked_ledger_hash.clone()); - let mask = snarked_ledger.copy(); + let snarked_ledger = self + .sync + .snarked_ledger_mut(snarked_ledger_hash.clone()) + .copy(); std::thread::spawn(move || { - let result = if let Some(parts) = parts { - let states = parts - .needed_blocks - .iter() - .map(|state| (state.hash().to_fp().unwrap(), state.clone())) - .collect::>(); - - StagedLedger::of_scan_state_pending_coinbases_and_snarked_ledger( - (), - &CONSTRAINT_CONSTANTS, - Verifier, - (&parts.scan_state).into(), - mask, - LocalState::empty(), - parts.staged_ledger_hash.0.to_field(), - (&parts.pending_coinbase).into(), - |key| states.get(&key).cloned().unwrap(), - ) - } else { - StagedLedger::create_exn(CONSTRAINT_CONSTANTS.clone(), mask) - }; + let (staged_ledger_hash, result) = + staged_ledger_reconstruct(snarked_ledger, snarked_ledger_hash, parts); callback(staged_ledger_hash, result); }); } + pub fn staged_ledger_reconstruct_sync( + &mut self, + snarked_ledger_hash: LedgerHash, + parts: Option>, + ) -> (v2::LedgerHash, Result<(), String>) { + let snarked_ledger = self + .sync + .snarked_ledger_mut(snarked_ledger_hash.clone()) + .copy(); + let (staged_ledger_hash, result) = + staged_ledger_reconstruct(snarked_ledger, snarked_ledger_hash, parts); + let result = match result { + Err(err) => Err(err), + Ok(staged_ledger) => { + self.staged_ledger_reconstruct_result_store(staged_ledger); + Ok(()) + } + }; + + (staged_ledger_hash, result) + } + pub fn block_apply( &mut self, block: ArcBlockWithHash, @@ -497,7 +498,7 @@ impl LedgerCtx { openmina_core::info!(openmina_core::log::system_time(); kind = "LedgerService::block_apply", summary = format!("{}, {} <- {}", block.height(), block.hash(), block.pred_hash()), - snarked_ledger_hash = block.snarked_ledger_hash().to_string(), + pred_staged_ledger_hash = pred_block.staged_ledger_hash().to_string(), staged_ledger_hash = block.staged_ledger_hash().to_string(), ); let mut staged_ledger = self @@ -1038,15 +1039,63 @@ impl LedgerSyncState { } } +fn staged_ledger_reconstruct( + snarked_ledger: Mask, + snarked_ledger_hash: LedgerHash, + parts: Option>, +) -> (v2::LedgerHash, Result) { + let staged_ledger_hash = parts + .as_ref() + .map(|p| p.staged_ledger_hash.clone()) + .unwrap_or_else(|| snarked_ledger_hash.clone()); + + let result = if let Some(parts) = parts { + let states = parts + .needed_blocks + .iter() + .map(|state| (state.hash().to_fp().unwrap(), state.clone())) + .collect::>(); + + StagedLedger::of_scan_state_pending_coinbases_and_snarked_ledger( + (), + &CONSTRAINT_CONSTANTS, + Verifier, + (&parts.scan_state).into(), + snarked_ledger, + LocalState::empty(), + parts.staged_ledger_hash.0.to_field(), + (&parts.pending_coinbase).into(), + |key| states.get(&key).cloned().unwrap(), + ) + } else { + StagedLedger::create_exn(CONSTRAINT_CONSTANTS.clone(), snarked_ledger) + }; + + (staged_ledger_hash, result) +} + pub trait LedgerService: redux::Service { fn ledger_manager(&self) -> &LedgerManager; + fn force_sync_calls(&self) -> bool { + false + } fn write_init(&mut self, request: LedgerWriteRequest) { - self.ledger_manager().call(LedgerRequest::Write(request)); + let request = LedgerRequest::Write(request); + if self.force_sync_calls() { + let _ = self.ledger_manager().call_sync(request); + } else { + self.ledger_manager().call(request); + } } fn read_init(&mut self, id: LedgerReadId, request: LedgerReadRequest) { - self.ledger_manager().call(LedgerRequest::Read(id, request)); + let request = LedgerRequest::Read(id, request); + if self.force_sync_calls() { + let _ = self.ledger_manager().call_sync(request); + } else { + self.ledger_manager().call(request); + } } } diff --git a/node/src/recorder/mod.rs b/node/src/recorder/mod.rs index 1db6c11dce..ff5c8b63ef 100644 --- a/node/src/recorder/mod.rs +++ b/node/src/recorder/mod.rs @@ -19,8 +19,7 @@ fn initial_state_path>(path: P) -> PathBuf { } fn actions_path>(path: P, file_index: usize) -> PathBuf { - path.as_ref() - .join(format!("actions_{}.cbor", file_index)) + path.as_ref().join(format!("actions_{}.cbor", file_index)) } #[derive(Serialize, Deserialize)]