Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::btree_map::Entry, sync::Arc};
use binprot::BinProtRead;
use mina_p2p_messages::{gossip, v2};
use openmina_core::{block::BlockWithHash, bug_condition, fuzz_maybe, fuzzed_maybe, Substate};
use redux::Dispatcher;
use redux::{Dispatcher, Timestamp};

use crate::{
channels::{snark::P2pChannelsSnarkAction, transaction::P2pChannelsTransactionAction},
Expand All @@ -28,7 +28,7 @@ impl P2pNetworkPubsubState {
Action: crate::P2pActionTrait<State>,
{
let pubsub_state = state_context.get_substate_mut()?;
let (action, _meta) = action.split();
let (action, meta) = action.split();

match action {
P2pNetworkPubsubAction::NewStream {
Expand Down Expand Up @@ -125,7 +125,7 @@ impl P2pNetworkPubsubState {
seen_limit,
..
} => {
pubsub_state.reduce_incoming_data(&peer_id, data)?;
pubsub_state.reduce_incoming_data(&peer_id, data, meta.time())?;

let dispatcher: &mut Dispatcher<Action, State> = state_context.into_dispatcher();
dispatcher.push(P2pNetworkPubsubEffectfulAction::IncomingData {
Expand Down Expand Up @@ -447,7 +447,12 @@ impl P2pNetworkPubsubState {
Ok(())
}

fn reduce_incoming_data(&mut self, peer_id: &PeerId, data: Data) -> Result<(), String> {
fn reduce_incoming_data(
&mut self,
peer_id: &PeerId,
data: Data,
timestamp: Timestamp,
) -> Result<(), String> {
let Some(state) = self.clients.get_mut(peer_id) else {
// TODO: investigate, cannot reproduce this
// bug_condition!("State not found for action: P2pNetworkPubsubAction::IncomingData");
Expand Down Expand Up @@ -529,14 +534,19 @@ impl P2pNetworkPubsubState {
}
}

for ihave in &control.ihave {
let message_ids = ihave
.message_ids
.iter()
.filter(|msg_id| !self.mcache.map.contains_key(*msg_id))
.cloned()
.collect();
if let Some(client) = self.clients.get_mut(peer_id) {
for ihave in control.ihave {
if self.clients.contains_key(peer_id) {
let message_ids = ihave
.message_ids
.into_iter()
.filter(|message_id| self.filter_iwant_message_ids(message_id, timestamp))
.collect::<Vec<_>>();

let Some(client) = self.clients.get_mut(peer_id) else {
bug_condition!("State not found for {}", peer_id);
return Ok(());
};

let ctr = client.message.control.get_or_insert_with(Default::default);
ctr.iwant.push(pb::ControlIWant { message_ids })
}
Expand Down
61 changes: 60 additions & 1 deletion p2p/src/network/pubsub/p2p_network_pubsub_state.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use super::pb;
use crate::{token::BroadcastAlgorithm, ConnectionAddr, PeerId, StreamId};

use std::collections::{BTreeMap, VecDeque};
use std::{
collections::{BTreeMap, VecDeque},
time::Duration,
};

use mina_p2p_messages::v2;
use openmina_core::{snark::Snark, transaction::Transaction};
use redux::Timestamp;
use serde::{Deserialize, Serialize};

pub const IWANT_TIMEOUT_DURATION: Duration = Duration::from_secs(5);

#[derive(Default, Serialize, Deserialize, Debug, Clone)]
pub struct P2pNetworkPubsubState {
pub clients: BTreeMap<PeerId, P2pNetworkPubsubClientState>,
Expand All @@ -18,12 +24,65 @@ pub struct P2pNetworkPubsubState {
pub incoming_transactions: Vec<(Transaction, u32)>,
pub incoming_snarks: Vec<(Snark, u32)>,
pub topics: BTreeMap<String, BTreeMap<PeerId, P2pNetworkPubsubClientTopicState>>,
pub iwant: VecDeque<P2pNetworkPubsubIwantRequestCount>,
}

#[derive(Default, Serialize, Deserialize, Debug, Clone)]
pub struct P2pNetworkPubsubIwantRequestCount {
pub message_id: Vec<u8>,
pub count: Vec<Timestamp>,
}

impl P2pNetworkPubsubState {
pub fn prune_peer_state(&mut self, peer_id: &PeerId) {
self.clients.remove(peer_id);
}

pub fn filter_iwant_message_ids(&mut self, message_id: &Vec<u8>, timestamp: Timestamp) -> bool {
if self.mcache.map.contains_key(message_id) {
return false;
}

let message_count = self
.iwant
.iter_mut()
.find(|message| &message.message_id == message_id);

match message_count {
Some(message) => {
let message_counts = std::mem::take(&mut message.count);

message.count = message_counts
.into_iter()
.filter(|time| {
timestamp
.checked_sub(*time)
.map_or(false, |duration| duration < IWANT_TIMEOUT_DURATION)
})
.collect();

if message.count.len() < 3 {
message.count.push(timestamp);
return true;
}

false
}
None => {
let message_count = P2pNetworkPubsubIwantRequestCount {
message_id: message_id.to_owned(),
count: vec![timestamp],
};

self.iwant.push_back(message_count);
if self.iwant.len() > 10 {
self.iwant.pop_front();
}

true
}
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down
Loading