Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- **Mempool**: Inside the transaction and snark pool reducers, only broadcast locally injected transactions and producer snarks. Libp2p layer takes care of diffs received from gossip already.

## [0.12.0] - 2024-12-04

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ impl ExternalSnarkWorker {
};
let sender = p2p.my_id();
// Directly add snark to the snark pool as it's produced by us.
dispatcher.push(SnarkPoolAction::WorkAdd { snark, sender });
dispatcher.push(SnarkPoolAction::WorkAdd {
snark,
sender,
is_sender_local: true,
});
dispatcher.push(ExternalSnarkWorkerAction::PruneWork);
}
ExternalSnarkWorkerAction::WorkError { error } => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ impl SnarkPoolCandidatesState {
dispatcher.push(SnarkPoolAction::WorkAdd {
snark: snark.clone(),
sender: *peer_id,
is_sender_local: false,
});
}
}
Expand Down
3 changes: 2 additions & 1 deletion node/src/snark_pool/snark_pool_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ pub enum SnarkPoolAction {
commitment: SnarkJobCommitment,
sender: PeerId,
},
#[action_event(level = info)]
#[action_event(level = info, fields(is_sender_local))]
WorkAdd {
snark: Snark,
sender: PeerId,
is_sender_local: bool,
},
#[action_event(level = trace)]
P2pSendAll,
Expand Down
19 changes: 14 additions & 5 deletions node/src/snark_pool/snark_pool_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ impl SnarkPoolState {
}
}
}
SnarkPoolAction::WorkAdd { snark, sender } => {
SnarkPoolAction::WorkAdd {
snark,
sender,
is_sender_local,
} => {
state.set_snark_work(SnarkWork {
work: snark.clone(),
received_t: meta.time(),
Expand All @@ -198,10 +202,15 @@ impl SnarkPoolState {
}
}

dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast {
snark: snark.clone(),
nonce: 0,
});
// TODO: we only rebroadcast locally produced snarks here.
// libp2p logic already broadcasts everything right now and doesn't
// wait for validation, thad needs to be fixed. See #952
if *is_sender_local {
dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast {
snark: snark.clone(),
nonce: 0,
});
}
}
SnarkPoolAction::P2pSendAll { .. } => {
let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
Expand Down
3 changes: 3 additions & 0 deletions node/src/transaction_pool/transaction_pool_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ impl redux::EnablingCondition<crate::State> for TransactionPoolAction {
last_index,
)
}),
TransactionPoolAction::Rebroadcast { accepted, rejected } => {
!(accepted.is_empty() && rejected.is_empty())
}
_ => true,
}
}
Expand Down
29 changes: 18 additions & 11 deletions node/src/transaction_pool/transaction_pool_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,16 @@ impl TransactionPoolState {
};

// Note(adonagy): Action for rebroadcast, in his action we can use forget_check
let (rpc_action, accepted, rejected) = match substate.pool.unsafe_apply(
meta.time(),
global_slot_from_genesis,
global_slot,
&diff,
accounts,
is_sender_local,
) {
let (rpc_action, was_accepted, accepted, rejected) = match substate
.pool
.unsafe_apply(
meta.time(),
global_slot_from_genesis,
global_slot,
&diff,
accounts,
is_sender_local,
) {
Ok((ApplyDecision::Accept, accepted, rejected, dropped)) => {
for hash in dropped {
substate.dpool.remove(&hash);
Expand All @@ -238,15 +240,15 @@ impl TransactionPoolState {
rpc_id,
response: accepted.clone(),
});
(rpc_action, accepted, rejected)
(rpc_action, true, accepted, rejected)
}
Ok((ApplyDecision::Reject, accepted, rejected, _)) => {
let rpc_action =
from_rpc.map(|rpc_id| RpcAction::TransactionInjectRejected {
rpc_id,
response: rejected.clone(),
});
(rpc_action, accepted, rejected)
(rpc_action, false, accepted, rejected)
}
Err(e) => {
crate::core::warn!(meta.time(); kind = "TransactionPoolUnsafeApplyError", summary = e);
Expand All @@ -258,7 +260,12 @@ impl TransactionPoolState {
if let Some(rpc_action) = rpc_action {
dispatcher.push(rpc_action);
}
dispatcher.push(TransactionPoolAction::Rebroadcast { accepted, rejected });
// TODO: we only rebroadcast locally injected transactions here.
// libp2p logic already broadcasts everything right now and doesn't
// wait for validation, thad needs to be fixed. See #952
if is_sender_local && was_accepted {
dispatcher.push(TransactionPoolAction::Rebroadcast { accepted, rejected });
}
}
TransactionPoolAction::ApplyTransitionFrontierDiff {
best_tip_hash,
Expand Down
Loading