From ec037674181c716e592138c38552dd2d1b3e6506 Mon Sep 17 00:00:00 2001 From: Zura Benashvili Date: Wed, 4 Dec 2024 15:23:40 +0400 Subject: [PATCH 1/2] feat(p2p): introduce random peer disconnections --- Cargo.lock | 1 + core/Cargo.toml | 1 + core/src/lib.rs | 6 ++++ node/src/action_kind.rs | 4 ++- node/src/state.rs | 2 +- p2p/src/disconnection/mod.rs | 2 ++ .../p2p_disconnection_actions.rs | 22 +++++++++--- .../p2p_disconnection_reducer.rs | 34 +++++++++++++++++-- p2p/src/p2p_config.rs | 5 +++ p2p/src/p2p_reducer.rs | 3 +- p2p/src/p2p_state.rs | 11 ++++++ 11 files changed, 82 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1a9ac14ee6..df64e849fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4582,6 +4582,7 @@ dependencies = [ "openmina-fuzzer", "openmina-macros", "poseidon", + "rand", "redux", "serde", "serde_json", diff --git a/core/Cargo.toml b/core/Cargo.toml index 7d1af1ad45..9976af2bbd 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -14,6 +14,7 @@ tracing = { version = "0.1", features = ["std"] } sha2 = "0.10.6" binprot = { git = "https://github.com/openmina/binprot-rs", rev = "2b5a909" } binprot_derive = { git = "https://github.com/openmina/binprot-rs", rev = "2b5a909" } +rand = "0.8.0" redux = { workspace = true } tokio = { version = "1.26", features = ["sync"] } time = { version = "0.3", features = ["formatting", "macros", "parsing"] } diff --git a/core/src/lib.rs b/core/src/lib.rs index 42755a1cab..2ae805537e 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -52,6 +52,12 @@ mod work_dir { pub use work_dir::{get_debug_dir, get_work_dir, set_work_dir}; +use rand::prelude::*; +#[inline(always)] +pub fn pseudo_rng(time: redux::Timestamp) -> StdRng { + StdRng::seed_from_u64(time.into()) +} + pub fn preshared_key(chain_id: &ChainId) -> [u8; 32] { use multihash::Hasher; let mut hasher = Blake2b256::default(); diff --git a/node/src/action_kind.rs b/node/src/action_kind.rs index 1cea0afb15..5175b0ad14 100644 --- a/node/src/action_kind.rs +++ b/node/src/action_kind.rs @@ -343,6 +343,7 @@ pub enum ActionKind { P2pDisconnectionFinish, P2pDisconnectionInit, P2pDisconnectionPeerClosed, + P2pDisconnectionRandomTry, P2pDisconnectionEffectfulInit, P2pEffectfulInitialize, P2pIdentifyNewRequest, @@ -705,7 +706,7 @@ pub enum ActionKind { } impl ActionKind { - pub const COUNT: u16 = 595; + pub const COUNT: u16 = 596; } impl std::fmt::Display for ActionKind { @@ -1212,6 +1213,7 @@ impl ActionKindGet for P2pConnectionAction { impl ActionKindGet for P2pDisconnectionAction { fn kind(&self) -> ActionKind { match self { + Self::RandomTry => ActionKind::P2pDisconnectionRandomTry, Self::Init { .. } => ActionKind::P2pDisconnectionInit, Self::PeerClosed { .. } => ActionKind::P2pDisconnectionPeerClosed, Self::FailedCleanup { .. } => ActionKind::P2pDisconnectionFailedCleanup, diff --git a/node/src/state.rs b/node/src/state.rs index 94f749920b..4be20a956c 100644 --- a/node/src/state.rs +++ b/node/src/state.rs @@ -309,7 +309,7 @@ impl State { } pub fn pseudo_rng(&self) -> StdRng { - StdRng::seed_from_u64(self.time().into()) + crate::core::pseudo_rng(self.time()) } /// Must be called in the global reducer as the last thing only once diff --git a/p2p/src/disconnection/mod.rs b/p2p/src/disconnection/mod.rs index 36e6519dea..29debe3d77 100644 --- a/p2p/src/disconnection/mod.rs +++ b/p2p/src/disconnection/mod.rs @@ -15,6 +15,8 @@ use crate::{ #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, thiserror::Error)] pub enum P2pDisconnectionReason { + #[error("random disconnection in order to free up space so that more peers can connect")] + FreeUpSpace, #[error("message is unexpected for channel {0}")] P2pChannelMsgUnexpected(ChannelId), #[error("failed to send message to channel: {0}")] diff --git a/p2p/src/disconnection/p2p_disconnection_actions.rs b/p2p/src/disconnection/p2p_disconnection_actions.rs index 86946f62b2..d265f552db 100644 --- a/p2p/src/disconnection/p2p_disconnection_actions.rs +++ b/p2p/src/disconnection/p2p_disconnection_actions.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use openmina_core::ActionEvent; use serde::{Deserialize, Serialize}; @@ -6,9 +8,12 @@ use crate::{P2pPeerStatus, P2pState, PeerId}; pub type P2pDisconnectionActionWithMetaRef<'a> = redux::ActionWithMeta<&'a P2pDisconnectionAction>; +const RANDOM_DISCONNECTION_TRY_FREQUENCY: Duration = Duration::from_secs(10); + #[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)] #[action_event(level = debug)] pub enum P2pDisconnectionAction { + RandomTry, /// Initialize disconnection. #[action_event(fields(display(peer_id), display(reason)), level = info)] Init { @@ -17,17 +22,26 @@ pub enum P2pDisconnectionAction { }, /// Peer disconnection. #[action_event(fields(display(peer_id)), level = info)] - PeerClosed { peer_id: PeerId }, + PeerClosed { + peer_id: PeerId, + }, #[action_event(fields(display(peer_id)), level = info)] - FailedCleanup { peer_id: PeerId }, + FailedCleanup { + peer_id: PeerId, + }, /// Finish disconnecting from a peer. #[action_event(fields(display(peer_id)), level = debug)] - Finish { peer_id: PeerId }, + Finish { + peer_id: PeerId, + }, } impl redux::EnablingCondition for P2pDisconnectionAction { - fn is_enabled(&self, state: &P2pState, _time: redux::Timestamp) -> bool { + fn is_enabled(&self, state: &P2pState, time: redux::Timestamp) -> bool { match self { + P2pDisconnectionAction::RandomTry => time + .checked_sub(state.last_random_disconnection_try) + .map_or(false, |dur| dur >= RANDOM_DISCONNECTION_TRY_FREQUENCY), P2pDisconnectionAction::Init { peer_id, .. } | P2pDisconnectionAction::PeerClosed { peer_id, .. } | P2pDisconnectionAction::Finish { peer_id } => { diff --git a/p2p/src/disconnection/p2p_disconnection_reducer.rs b/p2p/src/disconnection/p2p_disconnection_reducer.rs index 517aa5d9ff..48ba92098a 100644 --- a/p2p/src/disconnection/p2p_disconnection_reducer.rs +++ b/p2p/src/disconnection/p2p_disconnection_reducer.rs @@ -1,4 +1,7 @@ -use openmina_core::{bug_condition, Substate}; +use std::time::Duration; + +use openmina_core::{bug_condition, pseudo_rng, Substate}; +use rand::prelude::*; use redux::ActionWithMeta; use crate::{ @@ -6,7 +9,10 @@ use crate::{ P2pPeerAction, P2pPeerStatus, P2pState, }; -use super::{P2pDisconnectedState, P2pDisconnectionAction}; +use super::{P2pDisconnectedState, P2pDisconnectionAction, P2pDisconnectionReason}; + +/// Do not disconnect peer for this duration just for freeing up peer space. +const FORCE_PEER_STABLE_FOR: Duration = Duration::from_secs(90); impl P2pDisconnectedState { pub fn reducer( @@ -21,6 +27,30 @@ impl P2pDisconnectedState { let p2p_state = state_context.get_substate_mut()?; match action { + P2pDisconnectionAction::RandomTry => { + p2p_state.last_random_disconnection_try = meta.time(); + if p2p_state.config.limits.max_stable_peers() + >= p2p_state.ready_peers_iter().count() + { + return Ok(()); + } + let mut rng = pseudo_rng(meta.time()); + + let peer_id = p2p_state + .ready_peers_iter() + .filter(|(_, s)| s.connected_for(meta.time()) > FORCE_PEER_STABLE_FOR) + .map(|(id, _)| *id) + .choose(&mut rng); + + if let Some(peer_id) = peer_id { + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(P2pDisconnectionAction::Init { + peer_id, + reason: P2pDisconnectionReason::FreeUpSpace, + }); + } + Ok(()) + } P2pDisconnectionAction::Init { peer_id, reason } => { #[cfg(feature = "p2p-libp2p")] if p2p_state.is_libp2p_peer(&peer_id) { diff --git a/p2p/src/p2p_config.rs b/p2p/src/p2p_config.rs index 5206168321..a7d4854aa1 100644 --- a/p2p/src/p2p_config.rs +++ b/p2p/src/p2p_config.rs @@ -332,6 +332,11 @@ impl P2pLimits { min_peers(&self): self.max_peers.map(|v| (v / 2).max(3).min(v)) ); + limit!( + /// Above this limit, peers will be randomly disconnected to free up space. + max_stable_peers(&self): self.max_peers.map(|v| v.saturating_mul(8).saturating_div(10)) + ); + limit!( /// Maximum number of connections. max_connections(&self): self.max_peers.map(|v| v + 10) diff --git a/p2p/src/p2p_reducer.rs b/p2p/src/p2p_reducer.rs index e200dbd275..3e89d5550f 100644 --- a/p2p/src/p2p_reducer.rs +++ b/p2p/src/p2p_reducer.rs @@ -7,7 +7,7 @@ use crate::{ incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction, P2pConnectionState, }, - disconnection::P2pDisconnectedState, + disconnection::{P2pDisconnectedState, P2pDisconnectionAction}, P2pAction, P2pNetworkKadKey, P2pNetworkKademliaAction, P2pNetworkPnetAction, P2pNetworkRpcAction, P2pNetworkSelectAction, P2pNetworkState, P2pPeerState, P2pState, PeerId, }; @@ -81,6 +81,7 @@ impl P2pState { state.p2p_connection_timeouts_dispatch(dispatcher, time)?; dispatcher.push(P2pConnectionOutgoingAction::RandomInit); + dispatcher.push(P2pDisconnectionAction::RandomTry); state.p2p_try_reconnect_disconnected_peers(dispatcher, time)?; state.p2p_discovery(dispatcher, time)?; diff --git a/p2p/src/p2p_state.rs b/p2p/src/p2p_state.rs index e8a683ae82..813add9424 100644 --- a/p2p/src/p2p_state.rs +++ b/p2p/src/p2p_state.rs @@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize}; use std::{ collections::{BTreeMap, BTreeSet}, sync::Arc, + time::Duration, }; use crate::{ @@ -43,6 +44,9 @@ pub struct P2pState { pub config: P2pConfig, pub network: P2pNetworkState, pub peers: BTreeMap, + + pub last_random_disconnection_try: redux::Timestamp, + pub callbacks: P2pCallbacks, } @@ -120,6 +124,9 @@ impl P2pState { config, network, peers, + + last_random_disconnection_try: redux::Timestamp::ZERO, + callbacks, } } @@ -477,6 +484,10 @@ impl P2pPeerStatusReady { best_tip: None, } } + + pub fn connected_for(&self, now: redux::Timestamp) -> Duration { + now.checked_sub(self.connected_since).unwrap_or_default() + } } impl SubstateAccess for P2pState { From 9885a7cd3a6ddb1c293bc9456a189340ac37e26a Mon Sep 17 00:00:00 2001 From: Zura Benashvili Date: Mon, 9 Dec 2024 19:07:39 +0400 Subject: [PATCH 2/2] feat(cli/p2p): max_peers arg --- cli/src/commands/node/mod.rs | 4 ++ node/native/src/node/builder.rs | 81 +++++++++++++++++---------------- 2 files changed, 45 insertions(+), 40 deletions(-) diff --git a/cli/src/commands/node/mod.rs b/cli/src/commands/node/mod.rs index 1f837271ce..8a5e6d9d4c 100644 --- a/cli/src/commands/node/mod.rs +++ b/cli/src/commands/node/mod.rs @@ -75,6 +75,9 @@ pub struct Node { #[arg(long, env)] pub peer_list_url: Option, + #[arg(long, default_value = "100")] + pub max_peers: usize, + /// Run the node in seed mode. No default peers will be added. #[arg(long, env)] pub seed: bool, @@ -206,6 +209,7 @@ impl Node { .filter_map(|s| s.parse().ok()), ); + node_builder.p2p_max_peers(self.max_peers); self.seed.then(|| node_builder.p2p_seed_node()); self.no_peers_discovery .then(|| node_builder.p2p_no_discovery()); diff --git a/node/native/src/node/builder.rs b/node/native/src/node/builder.rs index 069021e4d1..55507f672a 100644 --- a/node/native/src/node/builder.rs +++ b/node/native/src/node/builder.rs @@ -4,6 +4,7 @@ use std::{ net::IpAddr, path::Path, sync::Arc, + time::Duration, }; use anyhow::Context; @@ -34,13 +35,10 @@ pub struct NodeBuilder { rng_seed: [u8; 32], custom_initial_time: Option, genesis_config: Arc, + p2p: P2pConfig, p2p_sec_key: Option, - p2p_libp2p_port: Option, p2p_is_seed: bool, - p2p_no_discovery: bool, p2p_is_started: bool, - initial_peers: Vec, - external_addrs: Vec, block_producer: Option, snarker: Option, service: NodeServiceBuilder, @@ -68,13 +66,25 @@ impl NodeBuilder { rng_seed, custom_initial_time: None, genesis_config, + p2p: P2pConfig { + libp2p_port: None, + listen_port: None, + // Must be replaced with builder api. + identity_pub_key: P2pSecretKey::deterministic(0).public_key(), + initial_peers: Vec::new(), + external_addrs: Vec::new(), + enabled_channels: ChannelId::iter_all().collect(), + peer_discovery: true, + meshsub: P2pMeshsubConfig { + initial_time: Duration::ZERO, + ..Default::default() + }, + timeouts: P2pTimeouts::default(), + limits: P2pLimits::default().with_max_peers(Some(100)), + }, p2p_sec_key: None, - p2p_libp2p_port: None, p2p_is_seed: false, - p2p_no_discovery: false, p2p_is_started: false, - initial_peers: Vec::new(), - external_addrs: Vec::new(), block_producer: None, snarker: None, service: NodeServiceBuilder::new(rng_seed), @@ -94,12 +104,13 @@ impl NodeBuilder { /// If not called, random one will be generated and used instead. pub fn p2p_sec_key(&mut self, key: P2pSecretKey) -> &mut Self { + self.p2p.identity_pub_key = key.public_key(); self.p2p_sec_key = Some(key); self } pub fn p2p_libp2p_port(&mut self, port: u16) -> &mut Self { - self.p2p_libp2p_port = Some(port); + self.p2p.libp2p_port = Some(port); self } @@ -110,7 +121,7 @@ impl NodeBuilder { } pub fn p2p_no_discovery(&mut self) -> &mut Self { - self.p2p_no_discovery = true; + self.p2p.peer_discovery = false; self } @@ -119,19 +130,19 @@ impl NodeBuilder { &mut self, peers: impl IntoIterator, ) -> &mut Self { - self.initial_peers.extend(peers); + self.p2p.initial_peers.extend(peers); self } pub fn external_addrs(&mut self, v: impl Iterator) -> &mut Self { - self.external_addrs.extend(v); + self.p2p.external_addrs.extend(v); self } /// Extend p2p initial peers from file. pub fn initial_peers_from_file(&mut self, path: impl AsRef) -> anyhow::Result<&mut Self> { peers_from_reader( - &mut self.initial_peers, + &mut self.p2p.initial_peers, File::open(&path).context(anyhow::anyhow!( "opening peer list file {:?}", path.as_ref() @@ -152,7 +163,7 @@ impl NodeBuilder { ) -> anyhow::Result<&mut Self> { let url = url.into_url().context("failed to parse peers url")?; peers_from_reader( - &mut self.initial_peers, + &mut self.p2p.initial_peers, reqwest::blocking::get(url.clone()) .context(anyhow::anyhow!("reading peer list url {url}"))?, ) @@ -160,6 +171,11 @@ impl NodeBuilder { Ok(self) } + pub fn p2p_max_peers(&mut self, limit: usize) -> &mut Self { + self.p2p.limits = self.p2p.limits.with_max_peers(Some(limit)); + self + } + /// Override default p2p task spawner. pub fn p2p_custom_task_spawner( &mut self, @@ -274,15 +290,15 @@ impl NodeBuilder { self } - pub fn build(self) -> anyhow::Result { + pub fn build(mut self) -> anyhow::Result { let p2p_sec_key = self.p2p_sec_key.unwrap_or_else(P2pSecretKey::rand); - let initial_peers = if self.initial_peers.is_empty() && !self.p2p_is_seed { - default_peers() - } else { - self.initial_peers - }; + if self.p2p.initial_peers.is_empty() && !self.p2p_is_seed { + self.p2p.initial_peers = default_peers(); + } - let initial_peers = initial_peers + self.p2p.initial_peers = self + .p2p + .initial_peers .into_iter() .filter_map(|opts| match opts { P2pConnectionOutgoingInitOpts::LibP2P(mut opts) => { @@ -293,8 +309,6 @@ impl NodeBuilder { }) .collect(); - let external_addrs = self.external_addrs; - let srs = self.verifier_srs.unwrap_or_else(get_srs); let block_verifier_index = self .block_verifier_index @@ -306,6 +320,9 @@ impl NodeBuilder { let initial_time = self .custom_initial_time .unwrap_or_else(redux::Timestamp::global_now); + self.p2p.meshsub.initial_time = initial_time + .checked_sub(redux::Timestamp::ZERO) + .unwrap_or_default(); let protocol_constants = self.genesis_config.protocol_constants()?; let consensus_consts = @@ -319,23 +336,7 @@ impl NodeBuilder { consensus_constants: consensus_consts.clone(), testing_run: false, }, - p2p: P2pConfig { - libp2p_port: self.p2p_libp2p_port, - listen_port: self.http_port, - identity_pub_key: p2p_sec_key.public_key(), - initial_peers, - external_addrs, - enabled_channels: ChannelId::iter_all().collect(), - peer_discovery: !self.p2p_no_discovery, - meshsub: P2pMeshsubConfig { - initial_time: initial_time - .checked_sub(redux::Timestamp::ZERO) - .unwrap_or_default(), - ..Default::default() - }, - timeouts: P2pTimeouts::default(), - limits: P2pLimits::default().with_max_peers(Some(100)), - }, + p2p: self.p2p, ledger: LedgerConfig {}, snark: SnarkConfig { block_verifier_index,