Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions cli/src/commands/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ pub struct Node {
#[arg(long, env)]
pub peer_list_url: Option<Url>,

#[arg(long, default_value = "100")]
pub max_peers: usize,

/// Run the node in seed mode. No default peers will be added.
#[arg(long, env)]
pub seed: bool,
Expand Down Expand Up @@ -206,6 +209,7 @@ impl Node {
.filter_map(|s| s.parse().ok()),
);

node_builder.p2p_max_peers(self.max_peers);
self.seed.then(|| node_builder.p2p_seed_node());
self.no_peers_discovery
.then(|| node_builder.p2p_no_discovery());
Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ tracing = { version = "0.1", features = ["std"] }
sha2 = "0.10.6"
binprot = { git = "https://github.com/openmina/binprot-rs", rev = "2b5a909" }
binprot_derive = { git = "https://github.com/openmina/binprot-rs", rev = "2b5a909" }
rand = "0.8.0"
redux = { workspace = true }
tokio = { version = "1.26", features = ["sync"] }
time = { version = "0.3", features = ["formatting", "macros", "parsing"] }
Expand Down
6 changes: 6 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ mod work_dir {

pub use work_dir::{get_debug_dir, get_work_dir, set_work_dir};

use rand::prelude::*;
#[inline(always)]
pub fn pseudo_rng(time: redux::Timestamp) -> StdRng {
StdRng::seed_from_u64(time.into())
}

pub fn preshared_key(chain_id: &ChainId) -> [u8; 32] {
use multihash::Hasher;
let mut hasher = Blake2b256::default();
Expand Down
81 changes: 41 additions & 40 deletions node/native/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
net::IpAddr,
path::Path,
sync::Arc,
time::Duration,
};

use anyhow::Context;
Expand Down Expand Up @@ -34,13 +35,10 @@ pub struct NodeBuilder {
rng_seed: [u8; 32],
custom_initial_time: Option<redux::Timestamp>,
genesis_config: Arc<GenesisConfig>,
p2p: P2pConfig,
p2p_sec_key: Option<P2pSecretKey>,
p2p_libp2p_port: Option<u16>,
p2p_is_seed: bool,
p2p_no_discovery: bool,
p2p_is_started: bool,
initial_peers: Vec<P2pConnectionOutgoingInitOpts>,
external_addrs: Vec<IpAddr>,
block_producer: Option<BlockProducerConfig>,
snarker: Option<SnarkerConfig>,
service: NodeServiceBuilder,
Expand Down Expand Up @@ -68,13 +66,25 @@ impl NodeBuilder {
rng_seed,
custom_initial_time: None,
genesis_config,
p2p: P2pConfig {
libp2p_port: None,
listen_port: None,
// Must be replaced with builder api.
identity_pub_key: P2pSecretKey::deterministic(0).public_key(),
initial_peers: Vec::new(),
external_addrs: Vec::new(),
enabled_channels: ChannelId::iter_all().collect(),
peer_discovery: true,
meshsub: P2pMeshsubConfig {
initial_time: Duration::ZERO,
..Default::default()
},
timeouts: P2pTimeouts::default(),
limits: P2pLimits::default().with_max_peers(Some(100)),
},
p2p_sec_key: None,
p2p_libp2p_port: None,
p2p_is_seed: false,
p2p_no_discovery: false,
p2p_is_started: false,
initial_peers: Vec::new(),
external_addrs: Vec::new(),
block_producer: None,
snarker: None,
service: NodeServiceBuilder::new(rng_seed),
Expand All @@ -94,12 +104,13 @@ impl NodeBuilder {

/// If not called, random one will be generated and used instead.
pub fn p2p_sec_key(&mut self, key: P2pSecretKey) -> &mut Self {
self.p2p.identity_pub_key = key.public_key();
self.p2p_sec_key = Some(key);
self
}

pub fn p2p_libp2p_port(&mut self, port: u16) -> &mut Self {
self.p2p_libp2p_port = Some(port);
self.p2p.libp2p_port = Some(port);
self
}

Expand All @@ -110,7 +121,7 @@ impl NodeBuilder {
}

pub fn p2p_no_discovery(&mut self) -> &mut Self {
self.p2p_no_discovery = true;
self.p2p.peer_discovery = false;
self
}

Expand All @@ -119,19 +130,19 @@ impl NodeBuilder {
&mut self,
peers: impl IntoIterator<Item = P2pConnectionOutgoingInitOpts>,
) -> &mut Self {
self.initial_peers.extend(peers);
self.p2p.initial_peers.extend(peers);
self
}

pub fn external_addrs(&mut self, v: impl Iterator<Item = IpAddr>) -> &mut Self {
self.external_addrs.extend(v);
self.p2p.external_addrs.extend(v);
self
}

/// Extend p2p initial peers from file.
pub fn initial_peers_from_file(&mut self, path: impl AsRef<Path>) -> anyhow::Result<&mut Self> {
peers_from_reader(
&mut self.initial_peers,
&mut self.p2p.initial_peers,
File::open(&path).context(anyhow::anyhow!(
"opening peer list file {:?}",
path.as_ref()
Expand All @@ -152,14 +163,19 @@ impl NodeBuilder {
) -> anyhow::Result<&mut Self> {
let url = url.into_url().context("failed to parse peers url")?;
peers_from_reader(
&mut self.initial_peers,
&mut self.p2p.initial_peers,
reqwest::blocking::get(url.clone())
.context(anyhow::anyhow!("reading peer list url {url}"))?,
)
.context(anyhow::anyhow!("reading peer list url {url}"))?;
Ok(self)
}

pub fn p2p_max_peers(&mut self, limit: usize) -> &mut Self {
self.p2p.limits = self.p2p.limits.with_max_peers(Some(limit));
self
}

/// Override default p2p task spawner.
pub fn p2p_custom_task_spawner(
&mut self,
Expand Down Expand Up @@ -274,15 +290,15 @@ impl NodeBuilder {
self
}

pub fn build(self) -> anyhow::Result<Node> {
pub fn build(mut self) -> anyhow::Result<Node> {
let p2p_sec_key = self.p2p_sec_key.unwrap_or_else(P2pSecretKey::rand);
let initial_peers = if self.initial_peers.is_empty() && !self.p2p_is_seed {
default_peers()
} else {
self.initial_peers
};
if self.p2p.initial_peers.is_empty() && !self.p2p_is_seed {
self.p2p.initial_peers = default_peers();
}

let initial_peers = initial_peers
self.p2p.initial_peers = self
.p2p
.initial_peers
.into_iter()
.filter_map(|opts| match opts {
P2pConnectionOutgoingInitOpts::LibP2P(mut opts) => {
Expand All @@ -293,8 +309,6 @@ impl NodeBuilder {
})
.collect();

let external_addrs = self.external_addrs;

let srs = self.verifier_srs.unwrap_or_else(get_srs);
let block_verifier_index = self
.block_verifier_index
Expand All @@ -306,6 +320,9 @@ impl NodeBuilder {
let initial_time = self
.custom_initial_time
.unwrap_or_else(redux::Timestamp::global_now);
self.p2p.meshsub.initial_time = initial_time
.checked_sub(redux::Timestamp::ZERO)
.unwrap_or_default();

let protocol_constants = self.genesis_config.protocol_constants()?;
let consensus_consts =
Expand All @@ -319,23 +336,7 @@ impl NodeBuilder {
consensus_constants: consensus_consts.clone(),
testing_run: false,
},
p2p: P2pConfig {
libp2p_port: self.p2p_libp2p_port,
listen_port: self.http_port,
identity_pub_key: p2p_sec_key.public_key(),
initial_peers,
external_addrs,
enabled_channels: ChannelId::iter_all().collect(),
peer_discovery: !self.p2p_no_discovery,
meshsub: P2pMeshsubConfig {
initial_time: initial_time
.checked_sub(redux::Timestamp::ZERO)
.unwrap_or_default(),
..Default::default()
},
timeouts: P2pTimeouts::default(),
limits: P2pLimits::default().with_max_peers(Some(100)),
},
p2p: self.p2p,
ledger: LedgerConfig {},
snark: SnarkConfig {
block_verifier_index,
Expand Down
4 changes: 3 additions & 1 deletion node/src/action_kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ pub enum ActionKind {
P2pDisconnectionFinish,
P2pDisconnectionInit,
P2pDisconnectionPeerClosed,
P2pDisconnectionRandomTry,
P2pDisconnectionEffectfulInit,
P2pEffectfulInitialize,
P2pIdentifyNewRequest,
Expand Down Expand Up @@ -705,7 +706,7 @@ pub enum ActionKind {
}

impl ActionKind {
pub const COUNT: u16 = 595;
pub const COUNT: u16 = 596;
}

impl std::fmt::Display for ActionKind {
Expand Down Expand Up @@ -1212,6 +1213,7 @@ impl ActionKindGet for P2pConnectionAction {
impl ActionKindGet for P2pDisconnectionAction {
fn kind(&self) -> ActionKind {
match self {
Self::RandomTry => ActionKind::P2pDisconnectionRandomTry,
Self::Init { .. } => ActionKind::P2pDisconnectionInit,
Self::PeerClosed { .. } => ActionKind::P2pDisconnectionPeerClosed,
Self::FailedCleanup { .. } => ActionKind::P2pDisconnectionFailedCleanup,
Expand Down
2 changes: 1 addition & 1 deletion node/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ impl State {
}

pub fn pseudo_rng(&self) -> StdRng {
StdRng::seed_from_u64(self.time().into())
crate::core::pseudo_rng(self.time())
}

/// Must be called in the global reducer as the last thing only once
Expand Down
2 changes: 2 additions & 0 deletions p2p/src/disconnection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use crate::{

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, thiserror::Error)]
pub enum P2pDisconnectionReason {
#[error("random disconnection in order to free up space so that more peers can connect")]
FreeUpSpace,
#[error("message is unexpected for channel {0}")]
P2pChannelMsgUnexpected(ChannelId),
#[error("failed to send message to channel: {0}")]
Expand Down
22 changes: 18 additions & 4 deletions p2p/src/disconnection/p2p_disconnection_actions.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use openmina_core::ActionEvent;
use serde::{Deserialize, Serialize};

Expand All @@ -6,9 +8,12 @@ use crate::{P2pPeerStatus, P2pState, PeerId};

pub type P2pDisconnectionActionWithMetaRef<'a> = redux::ActionWithMeta<&'a P2pDisconnectionAction>;

const RANDOM_DISCONNECTION_TRY_FREQUENCY: Duration = Duration::from_secs(10);

#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
#[action_event(level = debug)]
pub enum P2pDisconnectionAction {
RandomTry,
/// Initialize disconnection.
#[action_event(fields(display(peer_id), display(reason)), level = info)]
Init {
Expand All @@ -17,17 +22,26 @@ pub enum P2pDisconnectionAction {
},
/// Peer disconnection.
#[action_event(fields(display(peer_id)), level = info)]
PeerClosed { peer_id: PeerId },
PeerClosed {
peer_id: PeerId,
},
#[action_event(fields(display(peer_id)), level = info)]
FailedCleanup { peer_id: PeerId },
FailedCleanup {
peer_id: PeerId,
},
/// Finish disconnecting from a peer.
#[action_event(fields(display(peer_id)), level = debug)]
Finish { peer_id: PeerId },
Finish {
peer_id: PeerId,
},
}

impl redux::EnablingCondition<P2pState> for P2pDisconnectionAction {
fn is_enabled(&self, state: &P2pState, _time: redux::Timestamp) -> bool {
fn is_enabled(&self, state: &P2pState, time: redux::Timestamp) -> bool {
match self {
P2pDisconnectionAction::RandomTry => time
.checked_sub(state.last_random_disconnection_try)
.map_or(false, |dur| dur >= RANDOM_DISCONNECTION_TRY_FREQUENCY),
P2pDisconnectionAction::Init { peer_id, .. }
| P2pDisconnectionAction::PeerClosed { peer_id, .. }
| P2pDisconnectionAction::Finish { peer_id } => {
Expand Down
34 changes: 32 additions & 2 deletions p2p/src/disconnection/p2p_disconnection_reducer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use openmina_core::{bug_condition, Substate};
use std::time::Duration;

use openmina_core::{bug_condition, pseudo_rng, Substate};
use rand::prelude::*;
use redux::ActionWithMeta;

use crate::{
disconnection_effectful::P2pDisconnectionEffectfulAction, P2pNetworkSchedulerAction,
P2pPeerAction, P2pPeerStatus, P2pState,
};

use super::{P2pDisconnectedState, P2pDisconnectionAction};
use super::{P2pDisconnectedState, P2pDisconnectionAction, P2pDisconnectionReason};

/// Do not disconnect peer for this duration just for freeing up peer space.
const FORCE_PEER_STABLE_FOR: Duration = Duration::from_secs(90);

impl P2pDisconnectedState {
pub fn reducer<Action, State>(
Expand All @@ -21,6 +27,30 @@ impl P2pDisconnectedState {
let p2p_state = state_context.get_substate_mut()?;

match action {
P2pDisconnectionAction::RandomTry => {
p2p_state.last_random_disconnection_try = meta.time();
if p2p_state.config.limits.max_stable_peers()
>= p2p_state.ready_peers_iter().count()
{
return Ok(());
}
let mut rng = pseudo_rng(meta.time());

let peer_id = p2p_state
.ready_peers_iter()
.filter(|(_, s)| s.connected_for(meta.time()) > FORCE_PEER_STABLE_FOR)
.map(|(id, _)| *id)
.choose(&mut rng);

if let Some(peer_id) = peer_id {
let dispatcher = state_context.into_dispatcher();
dispatcher.push(P2pDisconnectionAction::Init {
peer_id,
reason: P2pDisconnectionReason::FreeUpSpace,
});
}
Ok(())
}
P2pDisconnectionAction::Init { peer_id, reason } => {
#[cfg(feature = "p2p-libp2p")]
if p2p_state.is_libp2p_peer(&peer_id) {
Expand Down
5 changes: 5 additions & 0 deletions p2p/src/p2p_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,11 @@ impl P2pLimits {
min_peers(&self): self.max_peers.map(|v| (v / 2).max(3).min(v))
);

limit!(
/// Above this limit, peers will be randomly disconnected to free up space.
max_stable_peers(&self): self.max_peers.map(|v| v.saturating_mul(8).saturating_div(10))
);

limit!(
/// Maximum number of connections.
max_connections(&self): self.max_peers.map(|v| v + 10)
Expand Down
Loading
Loading