diff --git a/Cargo.lock b/Cargo.lock index 9fa386ab1d..9ef65b515f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -180,7 +180,7 @@ version = "0.1.1" source = "git+https://github.com/openmina/alloc-test.git#0d3951eaddc58a6ea2d2134966ed5954234889d3" dependencies = [ "clap 4.5.2", - "derive_builder 0.11.2", + "derive_builder", "derive_more", "num", "serde", @@ -1586,33 +1586,13 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "derive-getters" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2c35ab6e03642397cdda1dd58abbc05d418aef8e36297f336d5aba060fe8df" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "derive_builder" version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d07adf7be193b71cc36b193d0f5fe60b918a3a9db4dad0449f57bcfd519704a3" dependencies = [ - "derive_builder_macro 0.11.2", -] - -[[package]] -name = "derive_builder" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0350b5cb0331628a5916d6c5c0b72e97393b8b6b03b47a9284f4e7f5a405ffd7" -dependencies = [ - "derive_builder_macro 0.20.0", + "derive_builder_macro", ] [[package]] @@ -1627,38 +1607,16 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "derive_builder_core" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d48cda787f839151732d396ac69e3473923d54312c070ee21e9effcaa8ca0b1d" -dependencies = [ - "darling 0.20.6", - "proc-macro2", - "quote", - "syn 2.0.58", -] - [[package]] name = "derive_builder_macro" version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f0314b72bed045f3a68671b3c86328386762c93f82d98c65c3cb5e5f573dd68" dependencies = [ - "derive_builder_core 0.11.2", + "derive_builder_core", "syn 1.0.109", ] -[[package]] -name = "derive_builder_macro" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "206868b8242f27cecce124c19fd88157fbd0dd334df2587f36417bafbc85097b" -dependencies = [ - "derive_builder_core 0.20.0", - "syn 2.0.58", -] - [[package]] name = "derive_more" version = "0.99.17" @@ -4847,8 +4805,6 @@ dependencies = [ name = "p2p-testing" version = "0.4.0" dependencies = [ - "derive-getters", - "derive_builder 0.20.0", "derive_more", "futures", "getrandom 0.2.14", diff --git a/cli/src/commands/node/mod.rs b/cli/src/commands/node/mod.rs index 61dde0b009..0ba5ffd882 100644 --- a/cli/src/commands/node/mod.rs +++ b/cli/src/commands/node/mod.rs @@ -26,7 +26,7 @@ use node::p2p::channels::ChannelId; use node::p2p::connection::outgoing::P2pConnectionOutgoingInitOpts; use node::p2p::identity::SecretKey; use node::p2p::service_impl::webrtc_with_libp2p::P2pServiceWebrtcWithLibp2p; -use node::p2p::{P2pConfig, P2pTimeouts}; +use node::p2p::{P2pConfig, P2pLimits, P2pTimeouts}; use node::service::{Recorder, Service}; use node::snark::{get_srs, get_verifier_index, VerifierKind}; use node::stats::Stats; @@ -216,14 +216,14 @@ impl Node { listen_port: self.port, identity_pub_key: pub_key, initial_peers: self.peers, - max_peers: 100, ask_initial_peers_interval: Duration::from_secs(3600), enabled_channels: ChannelId::for_libp2p().collect(), - timeouts: P2pTimeouts::default(), peer_discovery: !self.no_peers_discovery, initial_time: SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .expect("linear time"), + timeouts: P2pTimeouts::default(), + limits: P2pLimits::default().with_max_peers(Some(100)), }, transition_frontier, block_producer: block_producer.clone().map(|(config, _)| config), diff --git a/node/testing/src/cluster/mod.rs b/node/testing/src/cluster/mod.rs index f3df817dec..34a27e2755 100644 --- a/node/testing/src/cluster/mod.rs +++ b/node/testing/src/cluster/mod.rs @@ -26,7 +26,7 @@ use node::core::warn; use node::p2p::service_impl::{ webrtc::P2pServiceCtx, webrtc_with_libp2p::P2pServiceWebrtcWithLibp2p, }; -use node::p2p::{P2pConnectionEvent, P2pEvent, PeerId}; +use node::p2p::{P2pConnectionEvent, P2pEvent, P2pLimits, PeerId}; use node::snark::{VerifierIndex, VerifierSRS}; use node::{ event_source::Event, @@ -278,11 +278,11 @@ impl Cluster { listen_port: http_port, identity_pub_key: pub_key, initial_peers, - max_peers: testing_config.max_peers, ask_initial_peers_interval: testing_config.ask_initial_peers_interval, enabled_channels: ChannelId::iter_all().collect(), - timeouts: testing_config.timeouts, peer_discovery: true, + timeouts: testing_config.timeouts, + limits: P2pLimits::default().with_max_peers(Some(testing_config.max_peers)), initial_time: testing_config .initial_time .checked_sub(redux::Timestamp::ZERO) diff --git a/node/testing/src/scenarios/p2p/basic_connection_handling.rs b/node/testing/src/scenarios/p2p/basic_connection_handling.rs index 12532fe8a9..d1c775c7f4 100644 --- a/node/testing/src/scenarios/p2p/basic_connection_handling.rs +++ b/node/testing/src/scenarios/p2p/basic_connection_handling.rs @@ -247,7 +247,10 @@ impl MaxNumberOfPeersIncoming { // check that the number of ready peers does not exceed the maximal allowed number let state = driver.inner().node(node_ut).unwrap().state(); let count = state.p2p.ready_peers_iter().count(); - assert!(count <= MAX.into(), "max number of peers exceeded: {count}"); + assert!( + count <= usize::from(MAX), + "max number of peers exceeded: {count}" + ); // check that the number of nodes with the node as their peer does not exceed the maximal allowed number let peers_connected = || { @@ -263,7 +266,7 @@ impl MaxNumberOfPeersIncoming { }) }; assert!( - peers_connected().count() <= MAX.into(), + peers_connected().count() <= usize::from(MAX), "peers connections to the node exceed the max number of connections: {}", peers_connected().count() ); diff --git a/p2p/src/disconnection/mod.rs b/p2p/src/disconnection/mod.rs index 2c87c825e2..ad399b4d77 100644 --- a/p2p/src/disconnection/mod.rs +++ b/p2p/src/disconnection/mod.rs @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use crate::{channels::ChannelId, connection::RejectionReason}; -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, thiserror::Error)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, thiserror::Error)] pub enum P2pDisconnectionReason { #[error("message is unexpected for channel {0}")] P2pChannelMsgUnexpected(ChannelId), @@ -35,9 +35,6 @@ pub enum P2pDisconnectionReason { #[error("duplicate connection")] DuplicateConnection, - #[error("select error")] - SelectError, - #[error("timeout")] Timeout, } diff --git a/p2p/src/identity/peer_id.rs b/p2p/src/identity/peer_id.rs index ecb1d96296..c9f63c2046 100644 --- a/p2p/src/identity/peer_id.rs +++ b/p2p/src/identity/peer_id.rs @@ -63,7 +63,7 @@ impl fmt::Debug for PeerId { } } -#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, thiserror::Error, Serialize, Deserialize)] pub enum PeerIdFromLibp2pPeerId { #[error("error decoding public key from protobuf: {0}")] Protobuf(String), diff --git a/p2p/src/network/identify/p2p_network_identify_protocol.rs b/p2p/src/network/identify/p2p_network_identify_protocol.rs index 4314d64455..5c437b708e 100644 --- a/p2p/src/network/identify/p2p_network_identify_protocol.rs +++ b/p2p/src/network/identify/p2p_network_identify_protocol.rs @@ -17,7 +17,7 @@ pub struct P2pNetworkIdentify { pub protocols: Vec, } -#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, thiserror::Error)] pub enum P2pNetworkIdentifyFromMessageError { #[error("cant parse protocol: {0}")] UnsupportedProtocol(String), diff --git a/p2p/src/network/identify/p2p_network_identify_reducer.rs b/p2p/src/network/identify/p2p_network_identify_reducer.rs index 166f2235b9..890356d03d 100644 --- a/p2p/src/network/identify/p2p_network_identify_reducer.rs +++ b/p2p/src/network/identify/p2p_network_identify_reducer.rs @@ -1,11 +1,14 @@ use redux::ActionWithMeta; +use crate::P2pLimits; + use super::stream::P2pNetworkIdentifyStreamAction; impl super::P2pNetworkIdentifyState { pub fn reducer( &mut self, action: ActionWithMeta<&super::P2pNetworkIdentifyAction>, + limits: &P2pLimits, ) -> Result<(), String> { let (action, meta) = action.split(); match action { @@ -16,7 +19,7 @@ impl super::P2pNetworkIdentifyState { .map_err(|stream| { format!("Identify stream already exists for action {action:?}: {stream:?}") }) - .and_then(|stream| stream.reducer(meta.with_action(action))), + .and_then(|stream| stream.reducer(meta.with_action(action), limits)), super::P2pNetworkIdentifyAction::Stream( action @ P2pNetworkIdentifyStreamAction::Prune { .. }, ) => self @@ -26,7 +29,7 @@ impl super::P2pNetworkIdentifyState { super::P2pNetworkIdentifyAction::Stream(action) => self .find_identify_stream_state_mut(action.peer_id(), action.stream_id()) .ok_or_else(|| format!("Identify stream not found for action {action:?}")) - .and_then(|stream| stream.reducer(meta.with_action(action))), + .and_then(|stream| stream.reducer(meta.with_action(action), limits)), } } } diff --git a/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs b/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs index 0705876f33..a1885940fb 100644 --- a/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs +++ b/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs @@ -8,7 +8,10 @@ use super::{ super::{pb, P2pNetworkIdentify}, P2pNetworkIdentifyStreamAction, }; -use crate::{identify::P2pIdentifyAction, token, Data, P2pNetworkService, P2pNetworkYamuxAction}; +use crate::{ + identify::P2pIdentifyAction, network::identify::stream::P2pNetworkIdentifyStreamError, token, + Data, P2pNetworkSchedulerAction, P2pNetworkService, P2pNetworkYamuxAction, +}; fn get_addrs(addr: &SocketAddr, net_svc: &mut S) -> I where @@ -171,7 +174,11 @@ impl P2pNetworkIdentifyStreamAction { Ok(()) } S::Error(err) => { - warn!(meta.time(); summary = "error handling Identify action", error = err, action = format!("{self:?}")); + warn!(meta.time(); summary = "error handling Identify action", error = display(err)); + store.dispatch(P2pNetworkSchedulerAction::Error { + addr, + error: P2pNetworkIdentifyStreamError::from(err.clone()).into(), + }); Ok(()) } _ => unimplemented!(), diff --git a/p2p/src/network/identify/stream/p2p_network_identify_stream_reducer.rs b/p2p/src/network/identify/stream/p2p_network_identify_stream_reducer.rs index 8fc6b9cd35..62b175098f 100644 --- a/p2p/src/network/identify/stream/p2p_network_identify_stream_reducer.rs +++ b/p2p/src/network/identify/stream/p2p_network_identify_stream_reducer.rs @@ -1,7 +1,10 @@ use super::{ P2pNetworkIdentifyStreamAction, P2pNetworkIdentifyStreamKind, P2pNetworkIdentifyStreamState, }; -use crate::network::identify::{pb::Identify, P2pNetworkIdentify}; +use crate::{ + network::identify::{pb::Identify, P2pNetworkIdentify}, + P2pLimits, P2pNetworkStreamProtobufError, +}; use prost::Message; use quick_protobuf::BytesReader; use redux::ActionWithMeta; @@ -10,6 +13,7 @@ impl P2pNetworkIdentifyStreamState { pub fn reducer( &mut self, action: ActionWithMeta<&P2pNetworkIdentifyStreamAction>, + limits: &P2pLimits, ) -> Result<(), String> { use super::P2pNetworkIdentifyStreamAction as A; use super::P2pNetworkIdentifyStreamState as S; @@ -38,13 +42,16 @@ impl P2pNetworkIdentifyStreamState { let data = &data.0; let mut reader = BytesReader::from_bytes(data); let Ok(len) = reader.read_varint32(data).map(|v| v as usize) else { - *self = S::Error("error reading message length".to_owned()); + *self = S::Error(P2pNetworkStreamProtobufError::MessageLength); return Ok(()); }; // TODO: implement as configuration option - if len > 0x1000 { - *self = S::Error(format!("Identify message is too long ({})", len)); + if len > limits.identify_message() { + *self = S::Error(P2pNetworkStreamProtobufError::Limit( + len, + limits.identify_message(), + )); return Ok(()); } @@ -94,9 +101,9 @@ impl P2pNetworkIdentifyStreamState { let message = match Identify::decode(&data[..len]) { Ok(v) => v, Err(e) => { - *self = P2pNetworkIdentifyStreamState::Error(format!( - "error reading protobuf message: {e}" - )); + *self = P2pNetworkIdentifyStreamState::Error( + P2pNetworkStreamProtobufError::Message(e.to_string()), + ); return Ok(()); } }; @@ -104,9 +111,7 @@ impl P2pNetworkIdentifyStreamState { let data = match P2pNetworkIdentify::try_from(message.clone()) { Ok(v) => v, Err(e) => { - *self = P2pNetworkIdentifyStreamState::Error(format!( - "error converting protobuf message: {e}" - )); + *self = P2pNetworkIdentifyStreamState::Error(e.into()); return Ok(()); } }; diff --git a/p2p/src/network/identify/stream/p2p_network_identify_stream_state.rs b/p2p/src/network/identify/stream/p2p_network_identify_stream_state.rs index b3c6ce4a14..7a4a2079ea 100644 --- a/p2p/src/network/identify/stream/p2p_network_identify_stream_state.rs +++ b/p2p/src/network/identify/stream/p2p_network_identify_stream_state.rs @@ -1,4 +1,7 @@ -use crate::network::identify::P2pNetworkIdentify; +use crate::{ + network::identify::{P2pNetworkIdentify, P2pNetworkIdentifyFromMessageError}, + P2pNetworkStreamProtobufError, +}; use serde::{Deserialize, Serialize}; #[derive(Clone, Copy, Debug, Serialize, Deserialize)] @@ -36,7 +39,7 @@ pub enum P2pNetworkIdentifyStreamState { data: P2pNetworkIdentify, }, /// Error handling the stream. - Error(String), + Error(P2pNetworkStreamProtobufError), } impl P2pNetworkIdentifyStreamState { @@ -50,3 +53,9 @@ impl From for P2pNetworkIdentifyStreamState { P2pNetworkIdentifyStreamState::IdentifyReceived { data } } } + +#[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] +#[error("identify stream: {0}")] +pub struct P2pNetworkIdentifyStreamError( + #[from] P2pNetworkStreamProtobufError, +); diff --git a/p2p/src/network/kad/p2p_network_kad_internals.rs b/p2p/src/network/kad/p2p_network_kad_internals.rs index 95954b57a5..124930a0ef 100644 --- a/p2p/src/network/kad/p2p_network_kad_internals.rs +++ b/p2p/src/network/kad/p2p_network_kad_internals.rs @@ -369,7 +369,7 @@ impl P2pNetworkKadEntry { } } -#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)] +#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, thiserror::Error)] pub enum P2pNetworkKadEntryTryFromError { #[error(transparent)] PeerId(#[from] P2pNetworkKademliaPeerIdError), diff --git a/p2p/src/network/kad/p2p_network_kad_protocol.rs b/p2p/src/network/kad/p2p_network_kad_protocol.rs index 1b97ff6d77..2fdf56238a 100644 --- a/p2p/src/network/kad/p2p_network_kad_protocol.rs +++ b/p2p/src/network/kad/p2p_network_kad_protocol.rs @@ -57,7 +57,7 @@ impl P2pNetworkKademliaRpcRequest { } } -#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)] +#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, thiserror::Error)] pub enum P2pNetworkKademliaPeerIdError { #[error("error decoding PeerId from bytes: lenght {0} while expected 32")] Parse(String), @@ -99,7 +99,7 @@ pub enum P2pNetworkKademliaRpcPeerTryFromError { Multiaddr(#[from] P2pNetworkKademliaMultiaddrError), } -#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, thiserror::Error)] #[error("error decoding Multiaddr from bytes: {0}")] pub struct P2pNetworkKademliaMultiaddrError(String); @@ -115,7 +115,7 @@ impl From for P2pNetworkKademliaMultiaddrError { } } -#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, thiserror::Error)] pub enum P2pNetworkKademliaRpcFromMessageError { #[error(transparent)] PeerId(#[from] P2pNetworkKademliaPeerIdError), diff --git a/p2p/src/network/kad/p2p_network_kad_reducer.rs b/p2p/src/network/kad/p2p_network_kad_reducer.rs index 3e66335f3e..e420c7a597 100644 --- a/p2p/src/network/kad/p2p_network_kad_reducer.rs +++ b/p2p/src/network/kad/p2p_network_kad_reducer.rs @@ -1,13 +1,17 @@ use redux::ActionWithMeta; -use crate::P2pNetworkKadEntry; +use crate::{P2pLimits, P2pNetworkKadEntry}; use super::{P2pNetworkKadAction, P2pNetworkKadLatestRequestPeerKind, P2pNetworkKadStatus}; use super::stream::P2pNetworkKademliaStreamAction; impl super::P2pNetworkKadState { - pub fn reducer(&mut self, action: ActionWithMeta<&P2pNetworkKadAction>) -> Result<(), String> { + pub fn reducer( + &mut self, + action: ActionWithMeta<&P2pNetworkKadAction>, + limits: &P2pLimits, + ) -> Result<(), String> { let (action, meta) = action.split(); match action { P2pNetworkKadAction::System(action) => self.system_reducer(meta.with_action(action)), @@ -48,7 +52,7 @@ impl super::P2pNetworkKadState { .map_err(|stream| { format!("kademlia stream already exists for action {action:?}: {stream:?}") }) - .and_then(|stream| stream.reducer(meta.with_action(action))), + .and_then(|stream| stream.reducer(meta.with_action(action), limits)), P2pNetworkKadAction::Stream(action @ P2pNetworkKademliaStreamAction::Prune { .. }) => { self.remove_kad_stream_state(action.peer_id(), action.stream_id()) .then_some(()) @@ -57,7 +61,7 @@ impl super::P2pNetworkKadState { P2pNetworkKadAction::Stream(action) => self .find_kad_stream_state_mut(action.peer_id(), action.stream_id()) .ok_or_else(|| format!("kademlia stream not found for action {action:?}")) - .and_then(|stream| stream.reducer(meta.with_action(action))), + .and_then(|stream| stream.reducer(meta.with_action(action), limits)), } } diff --git a/p2p/src/network/kad/stream/p2p_network_kad_stream_effects.rs b/p2p/src/network/kad/stream/p2p_network_kad_stream_effects.rs index f237d49fa8..7012c0c4d1 100644 --- a/p2p/src/network/kad/stream/p2p_network_kad_stream_effects.rs +++ b/p2p/src/network/kad/stream/p2p_network_kad_stream_effects.rs @@ -1,7 +1,10 @@ use openmina_core::warn; use redux::ActionMeta; -use crate::{Data, P2pNetworkKademliaAction, P2pNetworkYamuxAction}; +use crate::{ + stream::{P2pNetworkKadIncomingStreamError, P2pNetworkKadOutgoingStreamError}, + Data, P2pNetworkKademliaAction, P2pNetworkSchedulerAction, P2pNetworkYamuxAction, +}; use super::{ super::{P2pNetworkKademliaRpcReply, P2pNetworkKademliaRpcRequest}, @@ -179,7 +182,15 @@ impl P2pNetworkKademliaStreamAction { Ok(()) } (action, D::Incoming(I::Error(err)) | D::Outgoing(O::Error(err))) => { - warn!(meta.time(); summary = "error handling kademlia action", error = err, action = format!("{action:?}")); + warn!(meta.time(); summary = "error handling kademlia action", error = display(err)); + let error = match state { + D::Incoming(_) => P2pNetworkKadIncomingStreamError::from(err.clone()).into(), + D::Outgoing(_) => P2pNetworkKadOutgoingStreamError::from(err.clone()).into(), + }; + store.dispatch(P2pNetworkSchedulerAction::Error { + addr: *action.addr(), + error, + }); Ok(()) } (action, _) => Err(format!("incorrect state {state:?} for action {action:?}")), diff --git a/p2p/src/network/kad/stream/p2p_network_kad_stream_reducer.rs b/p2p/src/network/kad/stream/p2p_network_kad_stream_reducer.rs index 3d3cd79e26..33acbb7eef 100644 --- a/p2p/src/network/kad/stream/p2p_network_kad_stream_reducer.rs +++ b/p2p/src/network/kad/stream/p2p_network_kad_stream_reducer.rs @@ -1,7 +1,10 @@ use quick_protobuf::{serialize_into_vec, BytesReader}; use redux::ActionWithMeta; -use crate::{P2pNetworkKademliaRpcReply, P2pNetworkKademliaRpcRequest}; +use crate::{ + P2pLimits, P2pNetworkKademliaRpcReply, P2pNetworkKademliaRpcRequest, + P2pNetworkStreamProtobufError, +}; use super::{ super::Message, P2pNetworkKadIncomingStreamState, P2pNetworkKadOutgoingStreamState, @@ -12,6 +15,7 @@ impl P2pNetworkKadIncomingStreamState { pub fn reducer( &mut self, action: ActionWithMeta<&P2pNetworkKademliaStreamAction>, + limits: &P2pLimits, ) -> Result<(), String> { use super::P2pNetworkKadIncomingStreamState as S; use super::P2pNetworkKademliaStreamAction as A; @@ -30,10 +34,19 @@ impl P2pNetworkKadIncomingStreamState { let mut reader = BytesReader::from_bytes(data); let Ok(len) = reader.read_varint32(data).map(|v| v as usize) else { - *self = S::Error("error reading message length".to_owned()); + *self = S::Error(P2pNetworkStreamProtobufError::MessageLength); return Ok(()); }; + println!("=== kademlia request len: {len}"); + if len > limits.kademlia_request() { + *self = S::Error(P2pNetworkStreamProtobufError::Limit( + len, + limits.kademlia_request(), + )); + return Ok(()); + } + if len > reader.len() { *self = S::PartialRequestReceived { len, @@ -87,7 +100,7 @@ impl P2pNetworkKadIncomingStreamState { let message = match reader.read_message_by_len::(data, len) { Ok(v) => v, Err(e) => { - *self = Error(format!("error reading protobuf message: {e}")); + *self = Error(P2pNetworkStreamProtobufError::Message(e.to_string())); return Ok(()); } }; @@ -95,7 +108,7 @@ impl P2pNetworkKadIncomingStreamState { let data = match P2pNetworkKademliaRpcRequest::try_from(message.clone()) { Ok(v) => v, Err(e) => { - *self = Error(format!("error converting protobuf message: {e}")); + *self = Error(e.into()); return Ok(()); } }; @@ -109,12 +122,11 @@ impl P2pNetworkKadOutgoingStreamState { pub fn reducer( &mut self, action: ActionWithMeta<&P2pNetworkKademliaStreamAction>, + limits: &P2pLimits, ) -> Result<(), String> { use super::P2pNetworkKadOutgoingStreamState as S; use super::P2pNetworkKademliaStreamAction as A; let (action, _meta) = action.split(); - // println!("=== state: {self:?}"); - // println!("=== action: {action:?}"); match (&self, action) { (S::Default, A::New { incoming, .. }) if !*incoming => { *self = S::WaitingForRequest { @@ -139,10 +151,19 @@ impl P2pNetworkKadOutgoingStreamState { let mut reader = BytesReader::from_bytes(data); let Ok(len) = reader.read_varint32(data).map(|v| v as usize) else { - *self = S::Error("error reading message length".to_owned()); + *self = S::Error(P2pNetworkStreamProtobufError::MessageLength); return Ok(()); }; + println!("=== kademlia response len: {len}"); + if len > limits.kademlia_response() { + *self = S::Error(P2pNetworkStreamProtobufError::Limit( + len, + limits.kademlia_response(), + )); + return Ok(()); + } + if len > reader.len() { *self = S::PartialReplyReceived { len, @@ -190,7 +211,7 @@ impl P2pNetworkKadOutgoingStreamState { let message = match reader.read_message_by_len::(data, len) { Ok(v) => v, Err(e) => { - *self = Error(format!("error reading protobuf message: {e}")); + *self = Error(P2pNetworkStreamProtobufError::Message(e.to_string())); return Ok(()); } }; @@ -198,7 +219,7 @@ impl P2pNetworkKadOutgoingStreamState { let data = match P2pNetworkKademliaRpcReply::try_from(message.clone()) { Ok(v) => v, Err(e) => { - *self = Error(format!("error converting protobuf message: {e}")); + *self = Error(e.into()); return Ok(()); } }; @@ -212,10 +233,11 @@ impl P2pNetworkKadStreamState { pub fn reducer( &mut self, action: ActionWithMeta<&P2pNetworkKademliaStreamAction>, + limits: &P2pLimits, ) -> Result<(), String> { match self { - P2pNetworkKadStreamState::Incoming(i) => i.reducer(action), - P2pNetworkKadStreamState::Outgoing(o) => o.reducer(action), + P2pNetworkKadStreamState::Incoming(i) => i.reducer(action, limits), + P2pNetworkKadStreamState::Outgoing(o) => o.reducer(action, limits), } } } diff --git a/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs b/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs index 79e434b3e6..e14fc25303 100644 --- a/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs +++ b/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs @@ -1,6 +1,9 @@ use serde::{Deserialize, Serialize}; -use crate::{P2pNetworkKademliaRpcReply, P2pNetworkKademliaRpcRequest}; +use crate::{ + P2pNetworkKademliaRpcFromMessageError, P2pNetworkKademliaRpcReply, + P2pNetworkKademliaRpcRequest, P2pNetworkStreamProtobufError, +}; #[derive(Clone, Debug, Serialize, Deserialize)] pub enum P2pNetworkKadStreamState { @@ -39,7 +42,7 @@ pub enum P2pNetworkKadIncomingStreamState { Closed, /// Error handling the stream. /// TODO: use enum for errors. - Error(String), + Error(P2pNetworkStreamProtobufError), } #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub enum P2pNetworkKadOutgoingStreamState { @@ -61,5 +64,17 @@ pub enum P2pNetworkKadOutgoingStreamState { Closed, /// Error handling the stream. /// TODO: use enum for errors. - Error(String), + Error(P2pNetworkStreamProtobufError), } + +#[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] +#[error("kademlia incoming stream: {0}")] +pub struct P2pNetworkKadIncomingStreamError( + #[from] P2pNetworkStreamProtobufError, +); + +#[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] +#[error("kademlia incoming stream: {0}")] +pub struct P2pNetworkKadOutgoingStreamError( + #[from] P2pNetworkStreamProtobufError, +); diff --git a/p2p/src/network/mod.rs b/p2p/src/network/mod.rs index c3ba700fc9..aba1301242 100644 --- a/p2p/src/network/mod.rs +++ b/p2p/src/network/mod.rs @@ -1,4 +1,10 @@ mod p2p_network_actions; +use serde::Deserialize; +use serde::Serialize; + +use crate::Limit; + +use self::identify::stream::P2pNetworkIdentifyStreamError; pub use self::p2p_network_actions::*; mod p2p_network_service; @@ -24,6 +30,7 @@ pub mod noise; pub use self::noise::*; pub mod yamux; +use self::stream::{P2pNetworkKadIncomingStreamError, P2pNetworkKadOutgoingStreamError}; pub use self::yamux::*; pub mod identify; @@ -162,3 +169,16 @@ mod data { } } } + +/// Errors that might happen while handling protobuf messages received via a stream. +#[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] +pub enum P2pNetworkStreamProtobufError { + #[error("error reading message length")] + MessageLength, + #[error("message is too long: {0} exceeds {1}")] + Limit(usize, Limit), + #[error("error reading message: {0}")] + Message(String), + #[error("error converting protobuf message: {0}")] + Convert(#[from] T), +} diff --git a/p2p/src/network/p2p_network_reducer.rs b/p2p/src/network/p2p_network_reducer.rs index 9b95d1c719..dd5efc0a13 100644 --- a/p2p/src/network/p2p_network_reducer.rs +++ b/p2p/src/network/p2p_network_reducer.rs @@ -1,7 +1,7 @@ use multiaddr::Multiaddr; use openmina_core::{error, ChainId}; -use crate::{identity::PublicKey, PeerId}; +use crate::{identity::PublicKey, P2pLimits, PeerId}; use super::*; @@ -47,7 +47,11 @@ impl P2pNetworkState { } impl P2pNetworkState { - pub fn reducer(&mut self, action: redux::ActionWithMeta<&P2pNetworkAction>) { + pub fn reducer( + &mut self, + action: redux::ActionWithMeta<&P2pNetworkAction>, + limits: &P2pLimits, + ) { let (action, meta) = action.split(); match action { P2pNetworkAction::Scheduler(a) => self.scheduler.reducer(meta.with_action(a)), @@ -88,7 +92,11 @@ impl P2pNetworkState { P2pNetworkAction::Identify(a) => { let time = meta.time(); // println!("======= identify reducer for {state:?}"); - if let Err(err) = self.scheduler.identify_state.reducer(meta.with_action(a)) { + if let Err(err) = self + .scheduler + .identify_state + .reducer(meta.with_action(a), limits) + { error!(time; "{err}"); } // println!("======= identify reducer result {state:?}"); @@ -99,18 +107,16 @@ impl P2pNetworkState { return; }; let time = meta.time(); - // println!("======= kad reducer for {state:?}"); - if let Err(err) = state.reducer(meta.with_action(a)) { + if let Err(err) = state.reducer(meta.with_action(a), limits) { error!(time; "{err}"); } - // println!("======= kad reducer result {state:?}"); } P2pNetworkAction::Pubsub(a) => { self.scheduler.broadcast_state.reducer(meta.with_action(a)) } P2pNetworkAction::Rpc(a) => { if let Some(state) = self.find_rpc_state_mut(a) { - state.reducer(meta.with_action(a)) + state.reducer(meta.with_action(a), limits) } } } diff --git a/p2p/src/network/rpc/p2p_network_rpc_reducer.rs b/p2p/src/network/rpc/p2p_network_rpc_reducer.rs index 8aa2de8fc8..b721ae0880 100644 --- a/p2p/src/network/rpc/p2p_network_rpc_reducer.rs +++ b/p2p/src/network/rpc/p2p_network_rpc_reducer.rs @@ -1,10 +1,18 @@ use binprot::BinProtRead; -use mina_p2p_messages::rpc_kernel::{MessageHeader, QueryHeader}; +use mina_p2p_messages::rpc_kernel::{MessageHeader, QueryHeader, RpcMethod}; + +use crate::{Limit, P2pLimits}; + +use self::p2p_network_rpc_state::P2pNetworkRpcError; use super::*; impl P2pNetworkRpcState { - pub fn reducer(&mut self, action: redux::ActionWithMeta<&P2pNetworkRpcAction>) { + pub fn reducer( + &mut self, + action: redux::ActionWithMeta<&P2pNetworkRpcAction>, + limits: &P2pLimits, + ) { if self.error.is_some() { return; } @@ -20,6 +28,10 @@ impl P2pNetworkRpcState { let buf = &self.buffer[offset..]; if let Some(len_bytes) = buf.get(..8).and_then(|s| s.try_into().ok()) { let len = u64::from_le_bytes(len_bytes) as usize; + if let Err(err) = self.check_rpc_limit(len, limits) { + self.error = Some(err); + return; + } if buf.len() >= 8 + len { offset += 8 + len; let mut slice = &buf[8..(8 + len)]; @@ -39,7 +51,7 @@ impl P2pNetworkRpcState { bytes: slice.to_vec().into(), }, Err(err) => { - self.error = Some(err.to_string()); + self.error = Some(P2pNetworkRpcError::Binprot(err.to_string())); continue; } }; @@ -85,4 +97,41 @@ impl P2pNetworkRpcState { _ => {} } } + + fn check_rpc_limit(&self, len: usize, limits: &P2pLimits) -> Result<(), P2pNetworkRpcError> { + let (limit, kind): (_, &[u8]) = if self.is_incoming { + // only requests are allowed + (limits.rpc_query(), b"") + } else if let Some(QueryHeader { tag, .. }) = self.pending.as_ref() { + use mina_p2p_messages::rpc::*; + match tag.as_ref() { + GetBestTipV2::NAME => (limits.rpc_get_best_tip(), GetBestTipV2::NAME), + AnswerSyncLedgerQueryV2::NAME => ( + limits.rpc_answer_sync_ledger_query(), + AnswerSyncLedgerQueryV2::NAME, + ), + GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::NAME => ( + limits.rpc_get_staged_ledger(), + GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::NAME, + ), + GetTransitionChainV2::NAME => ( + limits.rpc_get_transition_chain(), + GetTransitionChainV2::NAME, + ), + GetSomeInitialPeersV1ForV2::NAME => ( + limits.rpc_get_some_initial_peers(), + GetSomeInitialPeersV1ForV2::NAME, + ), + _ => (Limit::Some(0), b""), + } + } else { + (limits.rpc_service_message(), b"") + }; + let kind = String::from_utf8_lossy(kind); + if len > limit { + Err(P2pNetworkRpcError::Limit(kind.into_owned(), len, limit)) + } else { + Ok(()) + } + } } diff --git a/p2p/src/network/rpc/p2p_network_rpc_state.rs b/p2p/src/network/rpc/p2p_network_rpc_state.rs index 1e795149a8..f0e8d563d0 100644 --- a/p2p/src/network/rpc/p2p_network_rpc_state.rs +++ b/p2p/src/network/rpc/p2p_network_rpc_state.rs @@ -29,7 +29,7 @@ pub struct P2pNetworkRpcState { pub is_incoming: bool, pub buffer: Vec, pub incoming: VecDeque, - pub error: Option, + pub error: Option, } impl P2pNetworkRpcState { @@ -74,7 +74,7 @@ impl RpcMessage { .unwrap_or_default(); } Self::Query { header, bytes } => { - MessageHeader::Query(header) + MessageHeader::Query(header.clone()) .binprot_write(&mut v) .unwrap_or_default(); v.extend_from_slice(&bytes); @@ -92,3 +92,11 @@ impl RpcMessage { v } } + +#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)] +pub enum P2pNetworkRpcError { + #[error("error reading binprot message: {0}")] + Binprot(String), + #[error("message {0} with size {1} exceeds limit of {2}")] + Limit(String, usize, Limit), +} diff --git a/p2p/src/network/scheduler/p2p_network_scheduler_effects.rs b/p2p/src/network/scheduler/p2p_network_scheduler_effects.rs index 03015804f6..791bda755a 100644 --- a/p2p/src/network/scheduler/p2p_network_scheduler_effects.rs +++ b/p2p/src/network/scheduler/p2p_network_scheduler_effects.rs @@ -8,7 +8,7 @@ use crate::{ incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction, P2pConnectionState, }, - disconnection::{P2pDisconnectionAction, P2pDisconnectionReason}, + disconnection::P2pDisconnectionAction, identify::P2pIdentifyAction, network::identify::P2pNetworkIdentifyStreamAction, request::{P2pNetworkKadRequestState, P2pNetworkKadRequestStatus}, @@ -41,7 +41,10 @@ impl P2pNetworkSchedulerAction { // TODO: handle this error? } Self::IncomingConnectionIsReady { listener, .. } => { - if store.state().already_has_max_peers() { + let state = store.state(); + if state.network.scheduler.connections.len() + >= state.config.limits.max_connections() + { store.service().send_mio_cmd(MioCmd::Refuse(listener)); } else { store.service().send_mio_cmd(MioCmd::Accept(listener)); @@ -319,13 +322,7 @@ impl P2pNetworkSchedulerAction { store.dispatch(P2pNetworkSchedulerAction::Prune { addr }); - if !matches!( - reason, - P2pNetworkConnectionCloseReason::Disconnect( - P2pDisconnectionReason::SelectError - ) - ) && reason.is_disconnected() - { + if reason.is_disconnected() { // statemachine behaviour should continue with this, i.e. dispatch P2pDisconnectionAction::Finish return; } diff --git a/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs b/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs index a466840611..83e5d40e4a 100644 --- a/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs +++ b/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs @@ -2,8 +2,6 @@ use std::collections::BTreeMap; use openmina_core::error; -use crate::disconnection::P2pDisconnectionReason; - use super::{super::*, p2p_network_scheduler_state::P2pNetworkConnectionState, *}; impl P2pNetworkSchedulerState { @@ -141,9 +139,10 @@ impl P2pNetworkSchedulerState { connection.streams.remove(stream_id); } - connection.closed = Some(P2pNetworkConnectionCloseReason::Disconnect( - P2pDisconnectionReason::SelectError, - )); + connection.closed = Some(P2pNetworkConnectionError::SelectError.into()); + } + if let Some(connection) = self.connections.get_mut(addr) { + connection.closed = Some(P2pNetworkConnectionError::SelectError.into()); } else { unreachable!() } diff --git a/p2p/src/network/scheduler/p2p_network_scheduler_state.rs b/p2p/src/network/scheduler/p2p_network_scheduler_state.rs index 2a5c9a425e..3bc6fd4408 100644 --- a/p2p/src/network/scheduler/p2p_network_scheduler_state.rs +++ b/p2p/src/network/scheduler/p2p_network_scheduler_state.rs @@ -77,7 +77,7 @@ impl P2pNetworkConnectionState { } } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, thiserror::Error)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, thiserror::Error)] pub enum P2pNetworkConnectionCloseReason { #[error("peer is disconnected: {0}")] Disconnect(#[from] P2pDisconnectionReason), @@ -94,7 +94,7 @@ impl P2pNetworkConnectionCloseReason { } /// P2p connection error. -#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] pub enum P2pNetworkConnectionError { #[error("mio error: {0}")] MioError(String), @@ -102,6 +102,14 @@ pub enum P2pNetworkConnectionError { Noise(#[from] NoiseError), #[error("remote peer closed connection")] RemoteClosed, + #[error("select protocol error")] + SelectError, + #[error(transparent)] + IdentifyStreamError(#[from] P2pNetworkIdentifyStreamError), + #[error(transparent)] + KademliaIncomingStreamError(#[from] P2pNetworkKadIncomingStreamError), + #[error(transparent)] + KademliaOutgoingStreamError(#[from] P2pNetworkKadOutgoingStreamError), } #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/p2p/src/p2p_config.rs b/p2p/src/p2p_config.rs index 069d207e70..2633828638 100644 --- a/p2p/src/p2p_config.rs +++ b/p2p/src/p2p_config.rs @@ -23,11 +23,10 @@ pub struct P2pConfig { pub enabled_channels: BTreeSet, - /// Maximal allowed number of connections. - pub max_peers: usize, - pub timeouts: P2pTimeouts, + pub limits: P2pLimits, + /// Use peers discovery. pub peer_discovery: bool, @@ -86,3 +85,251 @@ impl P2pTimeouts { } } } + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, derive_more::Display)] +pub enum Limit { + #[display(fmt = "{}", _0)] + Some(T), + #[display(fmt = "unlimited")] + Unlimited, +} + +impl Limit { + pub fn map(self, f: F) -> Limit + where + F: FnOnce(T) -> O, + { + match self { + Limit::Some(t) => Limit::Some(f(t)), + Limit::Unlimited => Limit::Unlimited, + } + } +} + +macro_rules! impls { + ($ty:ty) => { + impl From> for Limit<$ty> { + fn from(value: Option<$ty>) -> Self { + match value { + Some(v) => Limit::Some(v), + None => Limit::Unlimited, + } + } + } + + impl From> for Option<$ty> { + fn from(value: Limit<$ty>) -> Self { + match value { + Limit::Some(v) => Some(v), + Limit::Unlimited => None, + } + } + } + + impl std::cmp::PartialEq<$ty> for Limit<$ty> { + fn eq(&self, other: &$ty) -> bool { + match self { + Limit::Some(v) => v.eq(other), + Limit::Unlimited => false, + } + } + } + + impl std::cmp::PartialEq> for $ty { + fn eq(&self, other: &Limit<$ty>) -> bool { + match other { + Limit::Some(other) => self.eq(other), + Limit::Unlimited => false, + } + } + } + + impl std::cmp::PartialEq> for Limit<$ty> { + fn eq(&self, other: &Limit<$ty>) -> bool { + match (self, other) { + (Limit::Some(this), Limit::Some(other)) => this.eq(other), + (Limit::Unlimited, Limit::Unlimited) => true, + _ => false, + } + } + } + + impl std::cmp::Eq for Limit<$ty> {} + + impl std::cmp::PartialOrd<$ty> for Limit<$ty> { + fn partial_cmp(&self, other: &$ty) -> Option { + match self { + Limit::Some(v) => v.partial_cmp(other), + Limit::Unlimited => Some(std::cmp::Ordering::Greater), + } + } + } + + impl std::cmp::PartialOrd> for $ty { + fn partial_cmp(&self, other: &Limit<$ty>) -> Option { + match other { + Limit::Some(other) => self.partial_cmp(other), + Limit::Unlimited => Some(std::cmp::Ordering::Less), + } + } + } + }; +} + +impls!(usize); +impls!(std::time::Duration); + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct P2pLimits { + max_peers: Limit, + + identify_message: Limit, + kademlia_request: Limit, + kademlia_response: Limit, + + rpc_service_message: Limit, + rpc_query: Limit, + rpc_get_best_tip: Limit, + rpc_answer_sync_ledger_query: Limit, + rpc_get_staged_ledger: Limit, + rpc_get_transition_chain: Limit, + rpc_get_some_initial_peers: Limit, +} + +macro_rules! limit { + (#[$meta:meta] $limit:ident) => { + #[$meta] + pub fn $limit(&self) -> Limit { + self.$limit + } + }; + + (#[$meta:meta] $limit:ident, #[$setter_meta:meta] $setter:ident) => { + limit!(#[$meta] $limit); + + #[$setter_meta] + pub fn $setter>>(mut self, $limit: T) -> Self { + self.$limit = $limit.into(); + self + } + }; + + (#[$meta:meta] $limit:ident(&$self:ident): $expr:expr) => { + #[$meta] + pub fn $limit(&$self) -> Limit { + $expr + } + }; +} + +impl P2pLimits { + limit!( + /// Maximum number of peers. + max_peers, + /// Sets maximum number of peers. + with_max_peers + ); + + limit!( + /// Minimum number of peers. + min_peers(&self): self.max_peers.map(|v| (v / 2).max(3).min(v)) + ); + + limit!( + /// Maximum number of connections. + max_connections(&self): self.max_peers.map(|v| v + 10) + ); + + limit!( + /// Maximum length of Identify message. + identify_message + ); + limit!( + /// Maximum length of Kademlia request message. + kademlia_request + ); + limit!( + /// Maximum length of Kademlia response message. + kademlia_response + ); + + limit!( + #[doc = "RPC service message"] + rpc_service_message + ); + limit!( + #[doc = "RPC query"] + rpc_query + ); + limit!( + #[doc = "RPC get_best_tip"] + rpc_get_best_tip + ); + limit!( + #[doc = "RPC answer_sync_ledger_query"] + rpc_answer_sync_ledger_query + ); + limit!( + #[doc = "RPC get_staged_ledger"] + rpc_get_staged_ledger + ); + limit!( + #[doc = "RPC get_transition_chain"] + rpc_get_transition_chain + ); + limit!( + #[doc = "RPC some_initial_peers"] + rpc_get_some_initial_peers + ); +} + +impl Default for P2pLimits { + fn default() -> Self { + let max_peers = Limit::Some(100); + + let identify_message = Limit::Some(0x1000); + let kademlia_request = Limit::Some(50); + let kademlia_response = identify_message.map(|v| v * 20); // should be enough to fit 20 addresses supplied by identify + + let rpc_service_message = Limit::Some(7); // 7 for handshake, 1 for heartbeat + let rpc_query = Limit::Some(256); // max is 96 + let rpc_get_best_tip = Limit::Some(3_500_000); // 3182930 as observed, may vary + let rpc_answer_sync_ledger_query = Limit::Some(200_000); // 124823 as observed + let rpc_get_staged_ledger = Limit::Some(40_000_000); // 36371615 as observed, may vary + let rpc_get_transition_chain = Limit::Some(3_500_000); // 2979112 as observed + let rpc_get_some_initial_peers = Limit::Some(32_000); // TODO: calculate + Self { + max_peers, + + identify_message, + kademlia_request, + kademlia_response, + + rpc_service_message, + rpc_query, + rpc_get_best_tip, + rpc_answer_sync_ledger_query, + rpc_get_staged_ledger, + rpc_get_transition_chain, + rpc_get_some_initial_peers, + } + } +} + +#[cfg(test)] +mod tests { + + use super::Limit; + + #[test] + fn test_limits() { + let limit = Limit::Some(10); + assert!(0 < limit); + assert!(10 <= limit); + assert!(11 > limit); + + let unlimited = Limit::Unlimited; + assert!(0 < unlimited); + assert!(usize::MAX < unlimited); + } +} diff --git a/p2p/src/p2p_reducer.rs b/p2p/src/p2p_reducer.rs index a272d407e3..5a9e6b1daf 100644 --- a/p2p/src/p2p_reducer.rs +++ b/p2p/src/p2p_reducer.rs @@ -113,7 +113,9 @@ impl P2pState { } } }, - P2pAction::Network(action) => self.network.reducer(meta.with_action(action)), + P2pAction::Network(action) => self + .network + .reducer(meta.with_action(action), &self.config.limits), } } } diff --git a/p2p/src/p2p_state.rs b/p2p/src/p2p_state.rs index ec6e012825..42c5982663 100644 --- a/p2p/src/p2p_state.rs +++ b/p2p/src/p2p_state.rs @@ -12,7 +12,7 @@ use crate::connection::incoming::P2pConnectionIncomingState; use crate::connection::outgoing::{P2pConnectionOutgoingInitOpts, P2pConnectionOutgoingState}; use crate::network::identify::P2pNetworkIdentify; use crate::network::P2pNetworkState; -use crate::{is_time_passed, P2pTimeouts, PeerId}; +use crate::{is_time_passed, Limit, P2pTimeouts, PeerId}; use super::connection::P2pConnectionState; use super::P2pConfig; @@ -195,21 +195,21 @@ impl P2pState { } pub fn already_has_min_peers(&self) -> bool { - self.connected_or_connecting_peers_count() >= self.min_peers() + self.connected_or_connecting_peers_count() >= self.config.limits.min_peers() } pub fn already_has_max_peers(&self) -> bool { - self.connected_or_connecting_peers_count() >= self.config.max_peers + self.connected_or_connecting_peers_count() >= self.config.limits.max_peers() } /// The peers capacity is exceeded. pub fn already_has_max_ready_peers(&self) -> bool { - self.ready_peers_iter().count() >= self.config.max_peers + self.ready_peers_iter().count() >= self.config.limits.max_peers() } /// Minimal number of peers that the node should connect - pub fn min_peers(&self) -> usize { - (self.config.max_peers / 2).max(3) + pub fn min_peers(&self) -> Limit { + self.config.limits.min_peers() } /// Peer with libp2p connection identified by `conn_id`. diff --git a/p2p/testing/Cargo.toml b/p2p/testing/Cargo.toml index b701832cc2..e273d04197 100644 --- a/p2p/testing/Cargo.toml +++ b/p2p/testing/Cargo.toml @@ -17,8 +17,6 @@ futures = "0.3.30" rand = "0.8.5" derive_more = "0.99.17" thiserror = "1.0.57" -derive-getters = "0.3.0" -derive_builder = "0.20.0" openmina-core = { path = "../../core" } pin-project-lite = "0.2" lazy_static = "1.4.0" diff --git a/p2p/testing/src/cluster.rs b/p2p/testing/src/cluster.rs index fe523b3239..eaf5173dc8 100644 --- a/p2p/testing/src/cluster.rs +++ b/p2p/testing/src/cluster.rs @@ -6,7 +6,6 @@ use std::{ time::{Duration, Instant}, }; -use derive_builder::UninitializedFieldError; use futures::StreamExt; use libp2p::{multiaddr::multiaddr, swarm::DialError, Multiaddr}; use openmina_core::{ChainId, BERKELEY_CHAIN_ID}; @@ -101,7 +100,7 @@ impl Default for ClusterBuilder { ports: None, ip: Ipv4Addr::LOCALHOST.into(), idle_duration: Duration::from_millis(100), - is_error: |_| false, + is_error: super::event::is_error, total_duration: Duration::from_secs(60), } } @@ -158,7 +157,7 @@ impl ClusterBuilder { let chain_id = self.chain_id; let ports = self .ports - .ok_or_else(|| UninitializedFieldError::new("ports"))? + .ok_or_else(|| Error::UninitializedField("ports"))? .ports() .await?; let ip = self.ip; @@ -267,8 +266,8 @@ pub enum Error { NoMorePorts, #[error(transparent)] AddrParse(#[from] P2pConnectionOutgoingInitOptsParseError), - #[error(transparent)] - UninitializedField(#[from] UninitializedFieldError), + #[error("uninitialized field `{0}`")] + UninitializedField(&'static str), #[error("swarm creation error: {0}")] Libp2pSwarm(String), #[error(transparent)] @@ -357,9 +356,9 @@ impl Cluster { initial_peers, ask_initial_peers_interval: Duration::from_secs(5), enabled_channels: p2p::channels::ChannelId::for_libp2p().collect(), - max_peers: 100, peer_discovery: config.discovery, timeouts: config.timeouts, + limits: config.limits, initial_time: Duration::ZERO, }; diff --git a/p2p/testing/src/event.rs b/p2p/testing/src/event.rs index ebd8365b3a..fb04e2892c 100644 --- a/p2p/testing/src/event.rs +++ b/p2p/testing/src/event.rs @@ -7,9 +7,11 @@ use p2p::{ identify::P2pIdentifyAction, network::identify::P2pNetworkIdentify, peer::P2pPeerAction, - P2pAction, P2pEvent, PeerId, + MioEvent, P2pAction, P2pEvent, PeerId, }; +use crate::cluster::ClusterEvent; + #[derive(Debug)] pub enum RustNodeEvent { Interface { @@ -188,3 +190,51 @@ pub(super) fn event_mapper_effect(store: &mut super::redux::Store, action: P2pAc _ => {} } } + +pub fn is_error(event: &ClusterEvent) -> bool { + let ClusterEvent::Rust { event, .. } = event else { + return false; + }; + match event { + RustNodeEvent::ListenerError { .. } => true, + RustNodeEvent::PeerConnectionError { .. } => true, + RustNodeEvent::PeerDisconnected { .. } => true, + RustNodeEvent::P2p { event } => match event { + P2pEvent::Connection(_event) => false, // TODO + P2pEvent::Channel(_event) => false, // TODO + P2pEvent::MioEvent(event) => matches!( + event, + MioEvent::ListenerError { .. } + | MioEvent::IncomingConnectionDidAccept(_, Err(_)) + | MioEvent::IncomingDataDidReceive(_, Err(_)) + | MioEvent::OutgoingConnectionDidConnect(_, Err(_)) + | MioEvent::OutgoingDataDidSend(_, Err(_)) + | MioEvent::ConnectionDidClose(_, Err(_)) + ), + }, + _ => false, + } +} + +pub fn allow_disconnections(event: &ClusterEvent) -> bool { + let ClusterEvent::Rust { event, .. } = event else { + return false; + }; + match event { + RustNodeEvent::ListenerError { .. } => true, + RustNodeEvent::PeerConnectionError { .. } => false, + RustNodeEvent::PeerDisconnected { .. } => true, + RustNodeEvent::P2p { event } => match event { + P2pEvent::Connection(_event) => false, // TODO + P2pEvent::Channel(_event) => false, // TODO + P2pEvent::MioEvent(event) => matches!( + event, + MioEvent::ListenerError { .. } | MioEvent::IncomingConnectionDidAccept(_, Err(_)) // | MioEvent::IncomingDataDidReceive(_, Err(_)) + // | MioEvent::OutgoingConnectionDidConnect(_, Err(_)) + // | MioEvent::OutgoingDataDidSend(_, Err(_)) + // | MioEvent::ConnectionDidClose(_, Err(_)) + ), + }, + _ => false, + } +} diff --git a/p2p/testing/src/rust_node.rs b/p2p/testing/src/rust_node.rs index e87a17a996..b89cbfa5ac 100644 --- a/p2p/testing/src/rust_node.rs +++ b/p2p/testing/src/rust_node.rs @@ -5,7 +5,7 @@ use std::{ }; use futures::Stream; -use p2p::{P2pAction, P2pEvent, P2pState, P2pTimeouts, PeerId}; +use p2p::{P2pAction, P2pEvent, P2pLimits, P2pState, P2pTimeouts, PeerId}; use redux::{EnablingCondition, SubStore}; use tokio::sync::mpsc; @@ -25,6 +25,7 @@ pub struct RustNodeConfig { pub peer_id: PeerIdConfig, pub initial_peers: Vec, pub timeouts: P2pTimeouts, + pub limits: P2pLimits, pub discovery: bool, } @@ -47,6 +48,11 @@ impl RustNodeConfig { self } + pub fn with_limits(mut self, limits: P2pLimits) -> Self { + self.limits = limits; + self + } + pub fn with_discovery(mut self, discovery: bool) -> Self { self.discovery = discovery; self diff --git a/p2p/tests/connection.rs b/p2p/tests/connection.rs index ad1217e70b..b3e47e0fba 100644 --- a/p2p/tests/connection.rs +++ b/p2p/tests/connection.rs @@ -3,6 +3,7 @@ use std::time::Duration; use p2p::PeerId; use p2p_testing::{ cluster::{Cluster, ClusterBuilder, NodeId}, + event::allow_disconnections, libp2p_node::Libp2pNodeConfig, rust_node::{RustNodeConfig, RustNodeId}, utils::{ @@ -133,6 +134,7 @@ async fn mutual_rust_to_rust() -> anyhow::Result<()> { let mut cluster = ClusterBuilder::default() .ports_with_len(10) .total_duration(Duration::from_secs(10)) + .is_error(allow_disconnections) .start() .await?; @@ -171,6 +173,7 @@ async fn mutual_rust_to_rust_many() -> anyhow::Result<()> { let mut cluster = ClusterBuilder::default() .ports_with_len(NUM * 2) .total_duration(Duration::from_secs(60)) + .is_error(allow_disconnections) .start() .await?; diff --git a/p2p/tests/kademlia.rs b/p2p/tests/kademlia.rs index 8a8ec93500..19fdd18e00 100644 --- a/p2p/tests/kademlia.rs +++ b/p2p/tests/kademlia.rs @@ -1,7 +1,7 @@ use p2p::{identity::SecretKey, P2pNetworkKadBucket, PeerId}; use p2p_testing::{ cluster::{Cluster, ClusterBuilder, ClusterEvent, Listener}, - event::RustNodeEvent, + event::{allow_disconnections, RustNodeEvent}, futures::TryStreamExt, predicates::kad_finished_bootstrap, rust_node::{RustNodeConfig, RustNodeId}, @@ -186,6 +186,7 @@ async fn bootstrap_no_peers() -> anyhow::Result<()> { let mut cluster = ClusterBuilder::new() .ports_with_len(3) .idle_duration(Duration::from_millis(100)) + .is_error(allow_disconnections) .start() .await?; @@ -221,6 +222,7 @@ async fn bootstrap_no_peers() -> anyhow::Result<()> { /// A node should be able to discover and connect a node connected to the seed node. #[tokio::test] async fn discovery_seed_single_peer() -> anyhow::Result<()> { + std::env::set_var("OPENMINA_DISCOVERY_FILTER_ADDR", false.to_string()); let mut cluster = ClusterBuilder::new() .ports_with_len(6) .idle_duration(Duration::from_millis(100)) @@ -254,15 +256,15 @@ async fn discovery_seed_single_peer() -> anyhow::Result<()> { #[tokio::test] async fn discovery_seed_multiple_peers() -> anyhow::Result<()> { + std::env::set_var("OPENMINA_DISCOVERY_FILTER_ADDR", false.to_string()); const PEERS: usize = 10; let mut cluster = ClusterBuilder::new() .ports_with_len(PEERS as u16 * 2 + 4) .idle_duration(Duration::from_millis(100)) + .is_error(allow_disconnections) .start() .await?; - std::env::set_var("OPENMINA_DISCOVERY_FILTER_ADDR", false.to_string()); - let [seed, nodes @ ..]: [_; PEERS + 1] = p2p_testing::utils::rust_nodes_from_config(&mut cluster, rust_config())?; let peer_ids = peer_ids(&cluster, nodes);