diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs index ed8eae6802..506c9393d6 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs @@ -3,7 +3,7 @@ use std::{collections::btree_map::Entry, sync::Arc}; use binprot::BinProtRead; use mina_p2p_messages::{gossip, v2}; use openmina_core::{block::BlockWithHash, bug_condition, fuzz_maybe, fuzzed_maybe, Substate}; -use redux::Dispatcher; +use redux::{Dispatcher, Timestamp}; use crate::{ channels::{snark::P2pChannelsSnarkAction, transaction::P2pChannelsTransactionAction}, @@ -28,7 +28,7 @@ impl P2pNetworkPubsubState { Action: crate::P2pActionTrait, { let pubsub_state = state_context.get_substate_mut()?; - let (action, _meta) = action.split(); + let (action, meta) = action.split(); match action { P2pNetworkPubsubAction::NewStream { @@ -125,7 +125,7 @@ impl P2pNetworkPubsubState { seen_limit, .. } => { - pubsub_state.reduce_incoming_data(&peer_id, data)?; + pubsub_state.reduce_incoming_data(&peer_id, data, meta.time())?; let dispatcher: &mut Dispatcher = state_context.into_dispatcher(); dispatcher.push(P2pNetworkPubsubEffectfulAction::IncomingData { @@ -447,7 +447,12 @@ impl P2pNetworkPubsubState { Ok(()) } - fn reduce_incoming_data(&mut self, peer_id: &PeerId, data: Data) -> Result<(), String> { + fn reduce_incoming_data( + &mut self, + peer_id: &PeerId, + data: Data, + timestamp: Timestamp, + ) -> Result<(), String> { let Some(state) = self.clients.get_mut(peer_id) else { // TODO: investigate, cannot reproduce this // bug_condition!("State not found for action: P2pNetworkPubsubAction::IncomingData"); @@ -529,14 +534,19 @@ impl P2pNetworkPubsubState { } } - for ihave in &control.ihave { - let message_ids = ihave - .message_ids - .iter() - .filter(|msg_id| !self.mcache.map.contains_key(*msg_id)) - .cloned() - .collect(); - if let Some(client) = self.clients.get_mut(peer_id) { + for ihave in control.ihave { + if self.clients.contains_key(peer_id) { + let message_ids = ihave + .message_ids + .into_iter() + .filter(|message_id| self.filter_iwant_message_ids(message_id, timestamp)) + .collect::>(); + + let Some(client) = self.clients.get_mut(peer_id) else { + bug_condition!("State not found for {}", peer_id); + return Ok(()); + }; + let ctr = client.message.control.get_or_insert_with(Default::default); ctr.iwant.push(pb::ControlIWant { message_ids }) } diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_state.rs b/p2p/src/network/pubsub/p2p_network_pubsub_state.rs index df750de320..e3abeaa971 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_state.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_state.rs @@ -1,12 +1,18 @@ use super::pb; use crate::{token::BroadcastAlgorithm, ConnectionAddr, PeerId, StreamId}; -use std::collections::{BTreeMap, VecDeque}; +use std::{ + collections::{BTreeMap, VecDeque}, + time::Duration, +}; use mina_p2p_messages::v2; use openmina_core::{snark::Snark, transaction::Transaction}; +use redux::Timestamp; use serde::{Deserialize, Serialize}; +pub const IWANT_TIMEOUT_DURATION: Duration = Duration::from_secs(5); + #[derive(Default, Serialize, Deserialize, Debug, Clone)] pub struct P2pNetworkPubsubState { pub clients: BTreeMap, @@ -18,12 +24,65 @@ pub struct P2pNetworkPubsubState { pub incoming_transactions: Vec<(Transaction, u32)>, pub incoming_snarks: Vec<(Snark, u32)>, pub topics: BTreeMap>, + pub iwant: VecDeque, +} + +#[derive(Default, Serialize, Deserialize, Debug, Clone)] +pub struct P2pNetworkPubsubIwantRequestCount { + pub message_id: Vec, + pub count: Vec, } impl P2pNetworkPubsubState { pub fn prune_peer_state(&mut self, peer_id: &PeerId) { self.clients.remove(peer_id); } + + pub fn filter_iwant_message_ids(&mut self, message_id: &Vec, timestamp: Timestamp) -> bool { + if self.mcache.map.contains_key(message_id) { + return false; + } + + let message_count = self + .iwant + .iter_mut() + .find(|message| &message.message_id == message_id); + + match message_count { + Some(message) => { + let message_counts = std::mem::take(&mut message.count); + + message.count = message_counts + .into_iter() + .filter(|time| { + timestamp + .checked_sub(*time) + .map_or(false, |duration| duration < IWANT_TIMEOUT_DURATION) + }) + .collect(); + + if message.count.len() < 3 { + message.count.push(timestamp); + return true; + } + + false + } + None => { + let message_count = P2pNetworkPubsubIwantRequestCount { + message_id: message_id.to_owned(), + count: vec![timestamp], + }; + + self.iwant.push_back(message_count); + if self.iwant.len() > 10 { + self.iwant.pop_front(); + } + + true + } + } + } } #[derive(Serialize, Deserialize, Debug, Clone)]