diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ff4a9cf2d..d2419b2835 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- **Mempool**: Inside the transaction and snark pool reducers, only broadcast locally injected transactions and producer snarks. Libp2p layer takes care of diffs received from gossip already. + ## [0.12.0] - 2024-12-04 ### Fixed diff --git a/node/src/external_snark_worker/external_snark_worker_reducer.rs b/node/src/external_snark_worker/external_snark_worker_reducer.rs index 6ad5c9677c..3d93afc3c8 100644 --- a/node/src/external_snark_worker/external_snark_worker_reducer.rs +++ b/node/src/external_snark_worker/external_snark_worker_reducer.rs @@ -125,7 +125,11 @@ impl ExternalSnarkWorker { }; let sender = p2p.my_id(); // Directly add snark to the snark pool as it's produced by us. - dispatcher.push(SnarkPoolAction::WorkAdd { snark, sender }); + dispatcher.push(SnarkPoolAction::WorkAdd { + snark, + sender, + is_sender_local: true, + }); dispatcher.push(ExternalSnarkWorkerAction::PruneWork); } ExternalSnarkWorkerAction::WorkError { error } => { diff --git a/node/src/snark_pool/candidate/snark_pool_candidate_reducer.rs b/node/src/snark_pool/candidate/snark_pool_candidate_reducer.rs index 97968688a8..c11769c5fd 100644 --- a/node/src/snark_pool/candidate/snark_pool_candidate_reducer.rs +++ b/node/src/snark_pool/candidate/snark_pool_candidate_reducer.rs @@ -169,6 +169,7 @@ impl SnarkPoolCandidatesState { dispatcher.push(SnarkPoolAction::WorkAdd { snark: snark.clone(), sender: *peer_id, + is_sender_local: false, }); } } diff --git a/node/src/snark_pool/snark_pool_actions.rs b/node/src/snark_pool/snark_pool_actions.rs index b06e9796d0..2ecb579494 100644 --- a/node/src/snark_pool/snark_pool_actions.rs +++ b/node/src/snark_pool/snark_pool_actions.rs @@ -33,10 +33,11 @@ pub enum SnarkPoolAction { commitment: SnarkJobCommitment, sender: PeerId, }, - #[action_event(level = info)] + #[action_event(level = info, fields(is_sender_local))] WorkAdd { snark: Snark, sender: PeerId, + is_sender_local: bool, }, #[action_event(level = trace)] P2pSendAll, diff --git a/node/src/snark_pool/snark_pool_reducer.rs b/node/src/snark_pool/snark_pool_reducer.rs index 22312cc523..b8d4c6f6ad 100644 --- a/node/src/snark_pool/snark_pool_reducer.rs +++ b/node/src/snark_pool/snark_pool_reducer.rs @@ -171,7 +171,11 @@ impl SnarkPoolState { } } } - SnarkPoolAction::WorkAdd { snark, sender } => { + SnarkPoolAction::WorkAdd { + snark, + sender, + is_sender_local, + } => { state.set_snark_work(SnarkWork { work: snark.clone(), received_t: meta.time(), @@ -198,10 +202,15 @@ impl SnarkPoolState { } } - dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast { - snark: snark.clone(), - nonce: 0, - }); + // TODO: we only rebroadcast locally produced snarks here. + // libp2p logic already broadcasts everything right now and doesn't + // wait for validation, thad needs to be fixed. See #952 + if *is_sender_local { + dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast { + snark: snark.clone(), + nonce: 0, + }); + } } SnarkPoolAction::P2pSendAll { .. } => { let (dispatcher, global_state) = state_context.into_dispatcher_and_state(); diff --git a/node/src/transaction_pool/transaction_pool_actions.rs b/node/src/transaction_pool/transaction_pool_actions.rs index 6b6c952e1c..b6a2116eee 100644 --- a/node/src/transaction_pool/transaction_pool_actions.rs +++ b/node/src/transaction_pool/transaction_pool_actions.rs @@ -107,6 +107,9 @@ impl redux::EnablingCondition for TransactionPoolAction { last_index, ) }), + TransactionPoolAction::Rebroadcast { accepted, rejected } => { + !(accepted.is_empty() && rejected.is_empty()) + } _ => true, } } diff --git a/node/src/transaction_pool/transaction_pool_reducer.rs b/node/src/transaction_pool/transaction_pool_reducer.rs index b84e32fc2a..f983745108 100644 --- a/node/src/transaction_pool/transaction_pool_reducer.rs +++ b/node/src/transaction_pool/transaction_pool_reducer.rs @@ -215,14 +215,16 @@ impl TransactionPoolState { }; // Note(adonagy): Action for rebroadcast, in his action we can use forget_check - let (rpc_action, accepted, rejected) = match substate.pool.unsafe_apply( - meta.time(), - global_slot_from_genesis, - global_slot, - &diff, - accounts, - is_sender_local, - ) { + let (rpc_action, was_accepted, accepted, rejected) = match substate + .pool + .unsafe_apply( + meta.time(), + global_slot_from_genesis, + global_slot, + &diff, + accounts, + is_sender_local, + ) { Ok((ApplyDecision::Accept, accepted, rejected, dropped)) => { for hash in dropped { substate.dpool.remove(&hash); @@ -238,7 +240,7 @@ impl TransactionPoolState { rpc_id, response: accepted.clone(), }); - (rpc_action, accepted, rejected) + (rpc_action, true, accepted, rejected) } Ok((ApplyDecision::Reject, accepted, rejected, _)) => { let rpc_action = @@ -246,7 +248,7 @@ impl TransactionPoolState { rpc_id, response: rejected.clone(), }); - (rpc_action, accepted, rejected) + (rpc_action, false, accepted, rejected) } Err(e) => { crate::core::warn!(meta.time(); kind = "TransactionPoolUnsafeApplyError", summary = e); @@ -258,7 +260,12 @@ impl TransactionPoolState { if let Some(rpc_action) = rpc_action { dispatcher.push(rpc_action); } - dispatcher.push(TransactionPoolAction::Rebroadcast { accepted, rejected }); + // TODO: we only rebroadcast locally injected transactions here. + // libp2p logic already broadcasts everything right now and doesn't + // wait for validation, thad needs to be fixed. See #952 + if is_sender_local && was_accepted { + dispatcher.push(TransactionPoolAction::Rebroadcast { accepted, rejected }); + } } TransactionPoolAction::ApplyTransitionFrontierDiff { best_tip_hash,