From 03d65ccbb6203322b609f4d664bb23eec866988d Mon Sep 17 00:00:00 2001 From: Alexander Koptelov Date: Wed, 29 May 2024 17:58:18 +0300 Subject: [PATCH 1/5] task(p2p): introduce limits for kademlia messages --- cli/src/commands/node/mod.rs | 5 +- node/testing/src/cluster/mod.rs | 5 +- .../p2p/basic_connection_handling.rs | 7 +- p2p/src/disconnection/mod.rs | 10 +- p2p/src/identity/peer_id.rs | 2 +- .../identify/p2p_network_identify_reducer.rs | 7 +- .../p2p_network_identify_stream_reducer.rs | 8 +- .../network/kad/p2p_network_kad_internals.rs | 2 +- .../network/kad/p2p_network_kad_protocol.rs | 6 +- .../network/kad/p2p_network_kad_reducer.rs | 12 +- .../stream/p2p_network_kad_stream_effects.rs | 18 ++- .../stream/p2p_network_kad_stream_reducer.rs | 44 +++++-- .../stream/p2p_network_kad_stream_state.rs | 29 ++++- p2p/src/network/mod.rs | 14 +++ p2p/src/network/p2p_network_reducer.rs | 18 ++- .../p2p_network_scheduler_effects.rs | 2 +- .../p2p_network_scheduler_reducer.rs | 12 +- .../scheduler/p2p_network_scheduler_state.rs | 2 +- p2p/src/p2p_config.rs | 115 ++++++++++++++++++ p2p/src/p2p_reducer.rs | 4 +- p2p/testing/src/cluster.rs | 3 +- p2p/testing/src/event.rs | 52 +++++++- p2p/testing/src/rust_node.rs | 8 +- p2p/tests/connection.rs | 3 + p2p/tests/kademlia.rs | 8 +- 25 files changed, 337 insertions(+), 59 deletions(-) diff --git a/cli/src/commands/node/mod.rs b/cli/src/commands/node/mod.rs index 61dde0b009..08ca356505 100644 --- a/cli/src/commands/node/mod.rs +++ b/cli/src/commands/node/mod.rs @@ -26,7 +26,7 @@ use node::p2p::channels::ChannelId; use node::p2p::connection::outgoing::P2pConnectionOutgoingInitOpts; use node::p2p::identity::SecretKey; use node::p2p::service_impl::webrtc_with_libp2p::P2pServiceWebrtcWithLibp2p; -use node::p2p::{P2pConfig, P2pTimeouts}; +use node::p2p::{P2pConfig, P2pLimits, P2pTimeouts}; use node::service::{Recorder, Service}; use node::snark::{get_srs, get_verifier_index, VerifierKind}; use node::stats::Stats; @@ -219,11 +219,12 @@ impl Node { max_peers: 100, ask_initial_peers_interval: Duration::from_secs(3600), enabled_channels: ChannelId::for_libp2p().collect(), - timeouts: P2pTimeouts::default(), peer_discovery: !self.no_peers_discovery, initial_time: SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .expect("linear time"), + timeouts: P2pTimeouts::default(), + limits: P2pLimits::default(), }, transition_frontier, block_producer: block_producer.clone().map(|(config, _)| config), diff --git a/node/testing/src/cluster/mod.rs b/node/testing/src/cluster/mod.rs index f3df817dec..388a78be6a 100644 --- a/node/testing/src/cluster/mod.rs +++ b/node/testing/src/cluster/mod.rs @@ -26,7 +26,7 @@ use node::core::warn; use node::p2p::service_impl::{ webrtc::P2pServiceCtx, webrtc_with_libp2p::P2pServiceWebrtcWithLibp2p, }; -use node::p2p::{P2pConnectionEvent, P2pEvent, PeerId}; +use node::p2p::{P2pConnectionEvent, P2pEvent, P2pLimits, PeerId}; use node::snark::{VerifierIndex, VerifierSRS}; use node::{ event_source::Event, @@ -281,8 +281,9 @@ impl Cluster { max_peers: testing_config.max_peers, ask_initial_peers_interval: testing_config.ask_initial_peers_interval, enabled_channels: ChannelId::iter_all().collect(), - timeouts: testing_config.timeouts, peer_discovery: true, + timeouts: testing_config.timeouts, + limits: P2pLimits::default(), initial_time: testing_config .initial_time .checked_sub(redux::Timestamp::ZERO) diff --git a/node/testing/src/scenarios/p2p/basic_connection_handling.rs b/node/testing/src/scenarios/p2p/basic_connection_handling.rs index 12532fe8a9..d1c775c7f4 100644 --- a/node/testing/src/scenarios/p2p/basic_connection_handling.rs +++ b/node/testing/src/scenarios/p2p/basic_connection_handling.rs @@ -247,7 +247,10 @@ impl MaxNumberOfPeersIncoming { // check that the number of ready peers does not exceed the maximal allowed number let state = driver.inner().node(node_ut).unwrap().state(); let count = state.p2p.ready_peers_iter().count(); - assert!(count <= MAX.into(), "max number of peers exceeded: {count}"); + assert!( + count <= usize::from(MAX), + "max number of peers exceeded: {count}" + ); // check that the number of nodes with the node as their peer does not exceed the maximal allowed number let peers_connected = || { @@ -263,7 +266,7 @@ impl MaxNumberOfPeersIncoming { }) }; assert!( - peers_connected().count() <= MAX.into(), + peers_connected().count() <= usize::from(MAX), "peers connections to the node exceed the max number of connections: {}", peers_connected().count() ); diff --git a/p2p/src/disconnection/mod.rs b/p2p/src/disconnection/mod.rs index 2c87c825e2..8cb9de165a 100644 --- a/p2p/src/disconnection/mod.rs +++ b/p2p/src/disconnection/mod.rs @@ -11,9 +11,9 @@ pub use p2p_disconnection_service::*; use serde::{Deserialize, Serialize}; -use crate::{channels::ChannelId, connection::RejectionReason}; +use crate::{channels::ChannelId, connection::RejectionReason, P2pNetworkError}; -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, thiserror::Error)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, thiserror::Error)] pub enum P2pDisconnectionReason { #[error("message is unexpected for channel {0}")] P2pChannelMsgUnexpected(ChannelId), @@ -35,9 +35,9 @@ pub enum P2pDisconnectionReason { #[error("duplicate connection")] DuplicateConnection, - #[error("select error")] - SelectError, - #[error("timeout")] Timeout, + + #[error("network error: {0}")] + NetworkError(#[from] P2pNetworkError), } diff --git a/p2p/src/identity/peer_id.rs b/p2p/src/identity/peer_id.rs index ecb1d96296..c9f63c2046 100644 --- a/p2p/src/identity/peer_id.rs +++ b/p2p/src/identity/peer_id.rs @@ -63,7 +63,7 @@ impl fmt::Debug for PeerId { } } -#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, thiserror::Error, Serialize, Deserialize)] pub enum PeerIdFromLibp2pPeerId { #[error("error decoding public key from protobuf: {0}")] Protobuf(String), diff --git a/p2p/src/network/identify/p2p_network_identify_reducer.rs b/p2p/src/network/identify/p2p_network_identify_reducer.rs index 166f2235b9..890356d03d 100644 --- a/p2p/src/network/identify/p2p_network_identify_reducer.rs +++ b/p2p/src/network/identify/p2p_network_identify_reducer.rs @@ -1,11 +1,14 @@ use redux::ActionWithMeta; +use crate::P2pLimits; + use super::stream::P2pNetworkIdentifyStreamAction; impl super::P2pNetworkIdentifyState { pub fn reducer( &mut self, action: ActionWithMeta<&super::P2pNetworkIdentifyAction>, + limits: &P2pLimits, ) -> Result<(), String> { let (action, meta) = action.split(); match action { @@ -16,7 +19,7 @@ impl super::P2pNetworkIdentifyState { .map_err(|stream| { format!("Identify stream already exists for action {action:?}: {stream:?}") }) - .and_then(|stream| stream.reducer(meta.with_action(action))), + .and_then(|stream| stream.reducer(meta.with_action(action), limits)), super::P2pNetworkIdentifyAction::Stream( action @ P2pNetworkIdentifyStreamAction::Prune { .. }, ) => self @@ -26,7 +29,7 @@ impl super::P2pNetworkIdentifyState { super::P2pNetworkIdentifyAction::Stream(action) => self .find_identify_stream_state_mut(action.peer_id(), action.stream_id()) .ok_or_else(|| format!("Identify stream not found for action {action:?}")) - .and_then(|stream| stream.reducer(meta.with_action(action))), + .and_then(|stream| stream.reducer(meta.with_action(action), limits)), } } } diff --git a/p2p/src/network/identify/stream/p2p_network_identify_stream_reducer.rs b/p2p/src/network/identify/stream/p2p_network_identify_stream_reducer.rs index 8fc6b9cd35..4692672f3b 100644 --- a/p2p/src/network/identify/stream/p2p_network_identify_stream_reducer.rs +++ b/p2p/src/network/identify/stream/p2p_network_identify_stream_reducer.rs @@ -1,7 +1,10 @@ use super::{ P2pNetworkIdentifyStreamAction, P2pNetworkIdentifyStreamKind, P2pNetworkIdentifyStreamState, }; -use crate::network::identify::{pb::Identify, P2pNetworkIdentify}; +use crate::{ + network::identify::{pb::Identify, P2pNetworkIdentify}, + P2pLimits, +}; use prost::Message; use quick_protobuf::BytesReader; use redux::ActionWithMeta; @@ -10,6 +13,7 @@ impl P2pNetworkIdentifyStreamState { pub fn reducer( &mut self, action: ActionWithMeta<&P2pNetworkIdentifyStreamAction>, + limits: &P2pLimits, ) -> Result<(), String> { use super::P2pNetworkIdentifyStreamAction as A; use super::P2pNetworkIdentifyStreamState as S; @@ -43,7 +47,7 @@ impl P2pNetworkIdentifyStreamState { }; // TODO: implement as configuration option - if len > 0x1000 { + if len > limits.identify_message { *self = S::Error(format!("Identify message is too long ({})", len)); return Ok(()); } diff --git a/p2p/src/network/kad/p2p_network_kad_internals.rs b/p2p/src/network/kad/p2p_network_kad_internals.rs index 95954b57a5..124930a0ef 100644 --- a/p2p/src/network/kad/p2p_network_kad_internals.rs +++ b/p2p/src/network/kad/p2p_network_kad_internals.rs @@ -369,7 +369,7 @@ impl P2pNetworkKadEntry { } } -#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)] +#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, thiserror::Error)] pub enum P2pNetworkKadEntryTryFromError { #[error(transparent)] PeerId(#[from] P2pNetworkKademliaPeerIdError), diff --git a/p2p/src/network/kad/p2p_network_kad_protocol.rs b/p2p/src/network/kad/p2p_network_kad_protocol.rs index 1b97ff6d77..2fdf56238a 100644 --- a/p2p/src/network/kad/p2p_network_kad_protocol.rs +++ b/p2p/src/network/kad/p2p_network_kad_protocol.rs @@ -57,7 +57,7 @@ impl P2pNetworkKademliaRpcRequest { } } -#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)] +#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, thiserror::Error)] pub enum P2pNetworkKademliaPeerIdError { #[error("error decoding PeerId from bytes: lenght {0} while expected 32")] Parse(String), @@ -99,7 +99,7 @@ pub enum P2pNetworkKademliaRpcPeerTryFromError { Multiaddr(#[from] P2pNetworkKademliaMultiaddrError), } -#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, thiserror::Error)] #[error("error decoding Multiaddr from bytes: {0}")] pub struct P2pNetworkKademliaMultiaddrError(String); @@ -115,7 +115,7 @@ impl From for P2pNetworkKademliaMultiaddrError { } } -#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, thiserror::Error)] pub enum P2pNetworkKademliaRpcFromMessageError { #[error(transparent)] PeerId(#[from] P2pNetworkKademliaPeerIdError), diff --git a/p2p/src/network/kad/p2p_network_kad_reducer.rs b/p2p/src/network/kad/p2p_network_kad_reducer.rs index 3e66335f3e..e420c7a597 100644 --- a/p2p/src/network/kad/p2p_network_kad_reducer.rs +++ b/p2p/src/network/kad/p2p_network_kad_reducer.rs @@ -1,13 +1,17 @@ use redux::ActionWithMeta; -use crate::P2pNetworkKadEntry; +use crate::{P2pLimits, P2pNetworkKadEntry}; use super::{P2pNetworkKadAction, P2pNetworkKadLatestRequestPeerKind, P2pNetworkKadStatus}; use super::stream::P2pNetworkKademliaStreamAction; impl super::P2pNetworkKadState { - pub fn reducer(&mut self, action: ActionWithMeta<&P2pNetworkKadAction>) -> Result<(), String> { + pub fn reducer( + &mut self, + action: ActionWithMeta<&P2pNetworkKadAction>, + limits: &P2pLimits, + ) -> Result<(), String> { let (action, meta) = action.split(); match action { P2pNetworkKadAction::System(action) => self.system_reducer(meta.with_action(action)), @@ -48,7 +52,7 @@ impl super::P2pNetworkKadState { .map_err(|stream| { format!("kademlia stream already exists for action {action:?}: {stream:?}") }) - .and_then(|stream| stream.reducer(meta.with_action(action))), + .and_then(|stream| stream.reducer(meta.with_action(action), limits)), P2pNetworkKadAction::Stream(action @ P2pNetworkKademliaStreamAction::Prune { .. }) => { self.remove_kad_stream_state(action.peer_id(), action.stream_id()) .then_some(()) @@ -57,7 +61,7 @@ impl super::P2pNetworkKadState { P2pNetworkKadAction::Stream(action) => self .find_kad_stream_state_mut(action.peer_id(), action.stream_id()) .ok_or_else(|| format!("kademlia stream not found for action {action:?}")) - .and_then(|stream| stream.reducer(meta.with_action(action))), + .and_then(|stream| stream.reducer(meta.with_action(action), limits)), } } diff --git a/p2p/src/network/kad/stream/p2p_network_kad_stream_effects.rs b/p2p/src/network/kad/stream/p2p_network_kad_stream_effects.rs index f237d49fa8..caf557caba 100644 --- a/p2p/src/network/kad/stream/p2p_network_kad_stream_effects.rs +++ b/p2p/src/network/kad/stream/p2p_network_kad_stream_effects.rs @@ -1,7 +1,11 @@ use openmina_core::warn; use redux::ActionMeta; -use crate::{Data, P2pNetworkKademliaAction, P2pNetworkYamuxAction}; +use crate::{ + disconnection::{P2pDisconnectionAction, P2pDisconnectionReason}, + stream::{P2pNetworkKadIncomingStreamError, P2pNetworkKadOutgoingStreamError}, + Data, P2pNetworkKademliaAction, P2pNetworkSchedulerAction, P2pNetworkYamuxAction, +}; use super::{ super::{P2pNetworkKademliaRpcReply, P2pNetworkKademliaRpcRequest}, @@ -179,7 +183,17 @@ impl P2pNetworkKademliaStreamAction { Ok(()) } (action, D::Incoming(I::Error(err)) | D::Outgoing(O::Error(err))) => { - warn!(meta.time(); summary = "error handling kademlia action", error = err, action = format!("{action:?}")); + warn!(meta.time(); summary = "error handling kademlia action", error = display(err)); + let error = match state { + D::Incoming(_) => P2pNetworkKadIncomingStreamError::from(err.clone()).into(), + D::Outgoing(_) => P2pNetworkKadOutgoingStreamError::from(err.clone()).into(), + }; + let peer_id = *action.peer_id(); + store.dispatch(P2pDisconnectionAction::Init { + peer_id, + reason: P2pDisconnectionReason::NetworkError(error), + }); + store.dispatch(P2pNetworkSchedulerAction::PruneStreams { peer_id }); Ok(()) } (action, _) => Err(format!("incorrect state {state:?} for action {action:?}")), diff --git a/p2p/src/network/kad/stream/p2p_network_kad_stream_reducer.rs b/p2p/src/network/kad/stream/p2p_network_kad_stream_reducer.rs index 3d3cd79e26..661d381d25 100644 --- a/p2p/src/network/kad/stream/p2p_network_kad_stream_reducer.rs +++ b/p2p/src/network/kad/stream/p2p_network_kad_stream_reducer.rs @@ -1,7 +1,10 @@ use quick_protobuf::{serialize_into_vec, BytesReader}; use redux::ActionWithMeta; -use crate::{P2pNetworkKademliaRpcReply, P2pNetworkKademliaRpcRequest}; +use crate::{ + stream::P2pNetworkKadStreamError, P2pLimits, P2pNetworkKademliaRpcReply, + P2pNetworkKademliaRpcRequest, +}; use super::{ super::Message, P2pNetworkKadIncomingStreamState, P2pNetworkKadOutgoingStreamState, @@ -12,6 +15,7 @@ impl P2pNetworkKadIncomingStreamState { pub fn reducer( &mut self, action: ActionWithMeta<&P2pNetworkKademliaStreamAction>, + limits: &P2pLimits, ) -> Result<(), String> { use super::P2pNetworkKadIncomingStreamState as S; use super::P2pNetworkKademliaStreamAction as A; @@ -30,10 +34,19 @@ impl P2pNetworkKadIncomingStreamState { let mut reader = BytesReader::from_bytes(data); let Ok(len) = reader.read_varint32(data).map(|v| v as usize) else { - *self = S::Error("error reading message length".to_owned()); + *self = S::Error(P2pNetworkKadStreamError::MessageLength); return Ok(()); }; + println!("=== kademlia request len: {len}"); + if len > limits.kademlia_request { + *self = S::Error(P2pNetworkKadStreamError::Limit( + len, + limits.kademlia_request, + )); + return Ok(()); + } + if len > reader.len() { *self = S::PartialRequestReceived { len, @@ -87,7 +100,7 @@ impl P2pNetworkKadIncomingStreamState { let message = match reader.read_message_by_len::(data, len) { Ok(v) => v, Err(e) => { - *self = Error(format!("error reading protobuf message: {e}")); + *self = Error(P2pNetworkKadStreamError::Message(e.to_string())); return Ok(()); } }; @@ -95,7 +108,7 @@ impl P2pNetworkKadIncomingStreamState { let data = match P2pNetworkKademliaRpcRequest::try_from(message.clone()) { Ok(v) => v, Err(e) => { - *self = Error(format!("error converting protobuf message: {e}")); + *self = Error(e.into()); return Ok(()); } }; @@ -109,12 +122,11 @@ impl P2pNetworkKadOutgoingStreamState { pub fn reducer( &mut self, action: ActionWithMeta<&P2pNetworkKademliaStreamAction>, + limits: &P2pLimits, ) -> Result<(), String> { use super::P2pNetworkKadOutgoingStreamState as S; use super::P2pNetworkKademliaStreamAction as A; let (action, _meta) = action.split(); - // println!("=== state: {self:?}"); - // println!("=== action: {action:?}"); match (&self, action) { (S::Default, A::New { incoming, .. }) if !*incoming => { *self = S::WaitingForRequest { @@ -139,10 +151,19 @@ impl P2pNetworkKadOutgoingStreamState { let mut reader = BytesReader::from_bytes(data); let Ok(len) = reader.read_varint32(data).map(|v| v as usize) else { - *self = S::Error("error reading message length".to_owned()); + *self = S::Error(P2pNetworkKadStreamError::MessageLength); return Ok(()); }; + println!("=== kademlia response len: {len}"); + if len > limits.kademlia_response { + *self = S::Error(P2pNetworkKadStreamError::Limit( + len, + limits.kademlia_response, + )); + return Ok(()); + } + if len > reader.len() { *self = S::PartialReplyReceived { len, @@ -190,7 +211,7 @@ impl P2pNetworkKadOutgoingStreamState { let message = match reader.read_message_by_len::(data, len) { Ok(v) => v, Err(e) => { - *self = Error(format!("error reading protobuf message: {e}")); + *self = Error(P2pNetworkKadStreamError::Message(e.to_string())); return Ok(()); } }; @@ -198,7 +219,7 @@ impl P2pNetworkKadOutgoingStreamState { let data = match P2pNetworkKademliaRpcReply::try_from(message.clone()) { Ok(v) => v, Err(e) => { - *self = Error(format!("error converting protobuf message: {e}")); + *self = Error(e.into()); return Ok(()); } }; @@ -212,10 +233,11 @@ impl P2pNetworkKadStreamState { pub fn reducer( &mut self, action: ActionWithMeta<&P2pNetworkKademliaStreamAction>, + limits: &P2pLimits, ) -> Result<(), String> { match self { - P2pNetworkKadStreamState::Incoming(i) => i.reducer(action), - P2pNetworkKadStreamState::Outgoing(o) => o.reducer(action), + P2pNetworkKadStreamState::Incoming(i) => i.reducer(action, limits), + P2pNetworkKadStreamState::Outgoing(o) => o.reducer(action, limits), } } } diff --git a/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs b/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs index 79e434b3e6..ec8a357382 100644 --- a/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs +++ b/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs @@ -1,6 +1,9 @@ use serde::{Deserialize, Serialize}; -use crate::{P2pNetworkKademliaRpcReply, P2pNetworkKademliaRpcRequest}; +use crate::{ + Limit, P2pNetworkKademliaRpcFromMessageError, P2pNetworkKademliaRpcReply, + P2pNetworkKademliaRpcRequest, +}; #[derive(Clone, Debug, Serialize, Deserialize)] pub enum P2pNetworkKadStreamState { @@ -39,7 +42,7 @@ pub enum P2pNetworkKadIncomingStreamState { Closed, /// Error handling the stream. /// TODO: use enum for errors. - Error(String), + Error(P2pNetworkKadStreamError), } #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub enum P2pNetworkKadOutgoingStreamState { @@ -61,5 +64,25 @@ pub enum P2pNetworkKadOutgoingStreamState { Closed, /// Error handling the stream. /// TODO: use enum for errors. - Error(String), + Error(P2pNetworkKadStreamError), } + +#[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] +pub enum P2pNetworkKadStreamError { + #[error("error reading message length")] + MessageLength, + #[error("message is too long: {0} exceeds {1}")] + Limit(usize, Limit), + #[error("error reading message: {0}")] + Message(String), + #[error("error converting protobuf message: {0}")] + Convert(#[from] P2pNetworkKademliaRpcFromMessageError), +} + +#[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] +#[error("kademlia incoming stream: {0}")] +pub struct P2pNetworkKadIncomingStreamError(#[from] P2pNetworkKadStreamError); + +#[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] +#[error("kademlia incoming stream: {0}")] +pub struct P2pNetworkKadOutgoingStreamError(#[from] P2pNetworkKadStreamError); diff --git a/p2p/src/network/mod.rs b/p2p/src/network/mod.rs index c3ba700fc9..e27829c89a 100644 --- a/p2p/src/network/mod.rs +++ b/p2p/src/network/mod.rs @@ -1,4 +1,7 @@ mod p2p_network_actions; +use serde::Deserialize; +use serde::Serialize; + pub use self::p2p_network_actions::*; mod p2p_network_service; @@ -24,6 +27,7 @@ pub mod noise; pub use self::noise::*; pub mod yamux; +use self::stream::{P2pNetworkKadIncomingStreamError, P2pNetworkKadOutgoingStreamError}; pub use self::yamux::*; pub mod identify; @@ -162,3 +166,13 @@ mod data { } } } + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, thiserror::Error)] +pub enum P2pNetworkError { + #[error("select error")] + SelectError, + #[error(transparent)] + KademliaIncomingStreamError(#[from] P2pNetworkKadIncomingStreamError), + #[error(transparent)] + KademliaOutgoingStreamError(#[from] P2pNetworkKadOutgoingStreamError), +} diff --git a/p2p/src/network/p2p_network_reducer.rs b/p2p/src/network/p2p_network_reducer.rs index 9b95d1c719..4964be088c 100644 --- a/p2p/src/network/p2p_network_reducer.rs +++ b/p2p/src/network/p2p_network_reducer.rs @@ -1,7 +1,7 @@ use multiaddr::Multiaddr; use openmina_core::{error, ChainId}; -use crate::{identity::PublicKey, PeerId}; +use crate::{identity::PublicKey, P2pLimits, PeerId}; use super::*; @@ -47,7 +47,11 @@ impl P2pNetworkState { } impl P2pNetworkState { - pub fn reducer(&mut self, action: redux::ActionWithMeta<&P2pNetworkAction>) { + pub fn reducer( + &mut self, + action: redux::ActionWithMeta<&P2pNetworkAction>, + limits: &P2pLimits, + ) { let (action, meta) = action.split(); match action { P2pNetworkAction::Scheduler(a) => self.scheduler.reducer(meta.with_action(a)), @@ -88,7 +92,11 @@ impl P2pNetworkState { P2pNetworkAction::Identify(a) => { let time = meta.time(); // println!("======= identify reducer for {state:?}"); - if let Err(err) = self.scheduler.identify_state.reducer(meta.with_action(a)) { + if let Err(err) = self + .scheduler + .identify_state + .reducer(meta.with_action(a), limits) + { error!(time; "{err}"); } // println!("======= identify reducer result {state:?}"); @@ -99,11 +107,9 @@ impl P2pNetworkState { return; }; let time = meta.time(); - // println!("======= kad reducer for {state:?}"); - if let Err(err) = state.reducer(meta.with_action(a)) { + if let Err(err) = state.reducer(meta.with_action(a), limits) { error!(time; "{err}"); } - // println!("======= kad reducer result {state:?}"); } P2pNetworkAction::Pubsub(a) => { self.scheduler.broadcast_state.reducer(meta.with_action(a)) diff --git a/p2p/src/network/scheduler/p2p_network_scheduler_effects.rs b/p2p/src/network/scheduler/p2p_network_scheduler_effects.rs index 03015804f6..0767870996 100644 --- a/p2p/src/network/scheduler/p2p_network_scheduler_effects.rs +++ b/p2p/src/network/scheduler/p2p_network_scheduler_effects.rs @@ -322,7 +322,7 @@ impl P2pNetworkSchedulerAction { if !matches!( reason, P2pNetworkConnectionCloseReason::Disconnect( - P2pDisconnectionReason::SelectError + P2pDisconnectionReason::NetworkError(P2pNetworkError::SelectError) ) ) && reason.is_disconnected() { diff --git a/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs b/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs index a466840611..d4987cead7 100644 --- a/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs +++ b/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs @@ -2,8 +2,6 @@ use std::collections::BTreeMap; use openmina_core::error; -use crate::disconnection::P2pDisconnectionReason; - use super::{super::*, p2p_network_scheduler_state::P2pNetworkConnectionState, *}; impl P2pNetworkSchedulerState { @@ -142,10 +140,16 @@ impl P2pNetworkSchedulerState { } connection.closed = Some(P2pNetworkConnectionCloseReason::Disconnect( - P2pDisconnectionReason::SelectError, + P2pNetworkError::SelectError.into(), )); } else { - unreachable!() + if let Some(connection) = self.connections.get_mut(addr) { + connection.closed = Some(P2pNetworkConnectionCloseReason::Disconnect( + P2pNetworkError::SelectError.into(), + )); + } else { + unreachable!() + } } } P2pNetworkSchedulerAction::YamuxDidInit { addr, .. } => { diff --git a/p2p/src/network/scheduler/p2p_network_scheduler_state.rs b/p2p/src/network/scheduler/p2p_network_scheduler_state.rs index 2a5c9a425e..0016405b2f 100644 --- a/p2p/src/network/scheduler/p2p_network_scheduler_state.rs +++ b/p2p/src/network/scheduler/p2p_network_scheduler_state.rs @@ -77,7 +77,7 @@ impl P2pNetworkConnectionState { } } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, thiserror::Error)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, thiserror::Error)] pub enum P2pNetworkConnectionCloseReason { #[error("peer is disconnected: {0}")] Disconnect(#[from] P2pDisconnectionReason), diff --git a/p2p/src/p2p_config.rs b/p2p/src/p2p_config.rs index 069d207e70..df21db45b4 100644 --- a/p2p/src/p2p_config.rs +++ b/p2p/src/p2p_config.rs @@ -28,6 +28,8 @@ pub struct P2pConfig { pub timeouts: P2pTimeouts, + pub limits: P2pLimits, + /// Use peers discovery. pub peer_discovery: bool, @@ -86,3 +88,116 @@ impl P2pTimeouts { } } } + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, derive_more::Display)] +pub enum Limit { + #[display(fmt = "{}", _0)] + Some(T), + #[display(fmt = "unlimited")] + Unlimited, +} + +impl Limit { + pub fn map(self, f: F) -> Limit + where + F: FnOnce(T) -> O, + { + match self { + Limit::Some(t) => Limit::Some(f(t)), + Limit::Unlimited => Limit::Unlimited, + } + } +} + +macro_rules! impls { + ($ty:ty) => { + impl std::cmp::PartialEq<$ty> for Limit<$ty> { + fn eq(&self, other: &$ty) -> bool { + match self { + Limit::Some(v) => v.eq(other), + Limit::Unlimited => false, + } + } + } + + impl std::cmp::PartialEq> for $ty { + fn eq(&self, other: &Limit<$ty>) -> bool { + match other { + Limit::Some(other) => self.eq(other), + Limit::Unlimited => false, + } + } + } + + impl std::cmp::PartialEq> for Limit<$ty> { + fn eq(&self, other: &Limit<$ty>) -> bool { + match (self, other) { + (Limit::Some(this), Limit::Some(other)) => this.eq(other), + (Limit::Unlimited, Limit::Unlimited) => true, + _ => false, + } + } + } + + impl std::cmp::Eq for Limit<$ty> {} + + impl std::cmp::PartialOrd<$ty> for Limit<$ty> { + fn partial_cmp(&self, other: &$ty) -> Option { + match self { + Limit::Some(v) => v.partial_cmp(other), + Limit::Unlimited => Some(std::cmp::Ordering::Greater), + } + } + } + + impl std::cmp::PartialOrd> for $ty { + fn partial_cmp(&self, other: &Limit<$ty>) -> Option { + match other { + Limit::Some(other) => self.partial_cmp(other), + Limit::Unlimited => Some(std::cmp::Ordering::Less), + } + } + } + }; +} + +impls!(usize); +impls!(std::time::Duration); + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct P2pLimits { + pub identify_message: Limit, + pub kademlia_request: Limit, + pub kademlia_response: Limit, +} + +impl Default for P2pLimits { + fn default() -> Self { + let identify_message = Limit::Some(0x1000); + let kademlia_request = Limit::Some(50); + let kademlia_response = identify_message.map(|v| v * 20); // should be enough to fit 20 addresses supplied by identify + Self { + identify_message, + kademlia_request, + kademlia_response, + } + } +} + +#[cfg(test)] +mod tests { + + use super::Limit; + + #[test] + fn test_limits() { + let limit = Limit::Some(10); + assert!(0 < limit); + assert!(10 <= limit); + assert!(11 > limit); + + let unlimited = Limit::Unlimited; + assert!(0 < unlimited); + assert!(usize::MAX < unlimited); + } +} diff --git a/p2p/src/p2p_reducer.rs b/p2p/src/p2p_reducer.rs index a272d407e3..5a9e6b1daf 100644 --- a/p2p/src/p2p_reducer.rs +++ b/p2p/src/p2p_reducer.rs @@ -113,7 +113,9 @@ impl P2pState { } } }, - P2pAction::Network(action) => self.network.reducer(meta.with_action(action)), + P2pAction::Network(action) => self + .network + .reducer(meta.with_action(action), &self.config.limits), } } } diff --git a/p2p/testing/src/cluster.rs b/p2p/testing/src/cluster.rs index fe523b3239..8d5cc2219e 100644 --- a/p2p/testing/src/cluster.rs +++ b/p2p/testing/src/cluster.rs @@ -101,7 +101,7 @@ impl Default for ClusterBuilder { ports: None, ip: Ipv4Addr::LOCALHOST.into(), idle_duration: Duration::from_millis(100), - is_error: |_| false, + is_error: super::event::is_error, total_duration: Duration::from_secs(60), } } @@ -360,6 +360,7 @@ impl Cluster { max_peers: 100, peer_discovery: config.discovery, timeouts: config.timeouts, + limits: config.limits, initial_time: Duration::ZERO, }; diff --git a/p2p/testing/src/event.rs b/p2p/testing/src/event.rs index ebd8365b3a..fb04e2892c 100644 --- a/p2p/testing/src/event.rs +++ b/p2p/testing/src/event.rs @@ -7,9 +7,11 @@ use p2p::{ identify::P2pIdentifyAction, network::identify::P2pNetworkIdentify, peer::P2pPeerAction, - P2pAction, P2pEvent, PeerId, + MioEvent, P2pAction, P2pEvent, PeerId, }; +use crate::cluster::ClusterEvent; + #[derive(Debug)] pub enum RustNodeEvent { Interface { @@ -188,3 +190,51 @@ pub(super) fn event_mapper_effect(store: &mut super::redux::Store, action: P2pAc _ => {} } } + +pub fn is_error(event: &ClusterEvent) -> bool { + let ClusterEvent::Rust { event, .. } = event else { + return false; + }; + match event { + RustNodeEvent::ListenerError { .. } => true, + RustNodeEvent::PeerConnectionError { .. } => true, + RustNodeEvent::PeerDisconnected { .. } => true, + RustNodeEvent::P2p { event } => match event { + P2pEvent::Connection(_event) => false, // TODO + P2pEvent::Channel(_event) => false, // TODO + P2pEvent::MioEvent(event) => matches!( + event, + MioEvent::ListenerError { .. } + | MioEvent::IncomingConnectionDidAccept(_, Err(_)) + | MioEvent::IncomingDataDidReceive(_, Err(_)) + | MioEvent::OutgoingConnectionDidConnect(_, Err(_)) + | MioEvent::OutgoingDataDidSend(_, Err(_)) + | MioEvent::ConnectionDidClose(_, Err(_)) + ), + }, + _ => false, + } +} + +pub fn allow_disconnections(event: &ClusterEvent) -> bool { + let ClusterEvent::Rust { event, .. } = event else { + return false; + }; + match event { + RustNodeEvent::ListenerError { .. } => true, + RustNodeEvent::PeerConnectionError { .. } => false, + RustNodeEvent::PeerDisconnected { .. } => true, + RustNodeEvent::P2p { event } => match event { + P2pEvent::Connection(_event) => false, // TODO + P2pEvent::Channel(_event) => false, // TODO + P2pEvent::MioEvent(event) => matches!( + event, + MioEvent::ListenerError { .. } | MioEvent::IncomingConnectionDidAccept(_, Err(_)) // | MioEvent::IncomingDataDidReceive(_, Err(_)) + // | MioEvent::OutgoingConnectionDidConnect(_, Err(_)) + // | MioEvent::OutgoingDataDidSend(_, Err(_)) + // | MioEvent::ConnectionDidClose(_, Err(_)) + ), + }, + _ => false, + } +} diff --git a/p2p/testing/src/rust_node.rs b/p2p/testing/src/rust_node.rs index e87a17a996..b89cbfa5ac 100644 --- a/p2p/testing/src/rust_node.rs +++ b/p2p/testing/src/rust_node.rs @@ -5,7 +5,7 @@ use std::{ }; use futures::Stream; -use p2p::{P2pAction, P2pEvent, P2pState, P2pTimeouts, PeerId}; +use p2p::{P2pAction, P2pEvent, P2pLimits, P2pState, P2pTimeouts, PeerId}; use redux::{EnablingCondition, SubStore}; use tokio::sync::mpsc; @@ -25,6 +25,7 @@ pub struct RustNodeConfig { pub peer_id: PeerIdConfig, pub initial_peers: Vec, pub timeouts: P2pTimeouts, + pub limits: P2pLimits, pub discovery: bool, } @@ -47,6 +48,11 @@ impl RustNodeConfig { self } + pub fn with_limits(mut self, limits: P2pLimits) -> Self { + self.limits = limits; + self + } + pub fn with_discovery(mut self, discovery: bool) -> Self { self.discovery = discovery; self diff --git a/p2p/tests/connection.rs b/p2p/tests/connection.rs index ad1217e70b..b3e47e0fba 100644 --- a/p2p/tests/connection.rs +++ b/p2p/tests/connection.rs @@ -3,6 +3,7 @@ use std::time::Duration; use p2p::PeerId; use p2p_testing::{ cluster::{Cluster, ClusterBuilder, NodeId}, + event::allow_disconnections, libp2p_node::Libp2pNodeConfig, rust_node::{RustNodeConfig, RustNodeId}, utils::{ @@ -133,6 +134,7 @@ async fn mutual_rust_to_rust() -> anyhow::Result<()> { let mut cluster = ClusterBuilder::default() .ports_with_len(10) .total_duration(Duration::from_secs(10)) + .is_error(allow_disconnections) .start() .await?; @@ -171,6 +173,7 @@ async fn mutual_rust_to_rust_many() -> anyhow::Result<()> { let mut cluster = ClusterBuilder::default() .ports_with_len(NUM * 2) .total_duration(Duration::from_secs(60)) + .is_error(allow_disconnections) .start() .await?; diff --git a/p2p/tests/kademlia.rs b/p2p/tests/kademlia.rs index 8a8ec93500..19fdd18e00 100644 --- a/p2p/tests/kademlia.rs +++ b/p2p/tests/kademlia.rs @@ -1,7 +1,7 @@ use p2p::{identity::SecretKey, P2pNetworkKadBucket, PeerId}; use p2p_testing::{ cluster::{Cluster, ClusterBuilder, ClusterEvent, Listener}, - event::RustNodeEvent, + event::{allow_disconnections, RustNodeEvent}, futures::TryStreamExt, predicates::kad_finished_bootstrap, rust_node::{RustNodeConfig, RustNodeId}, @@ -186,6 +186,7 @@ async fn bootstrap_no_peers() -> anyhow::Result<()> { let mut cluster = ClusterBuilder::new() .ports_with_len(3) .idle_duration(Duration::from_millis(100)) + .is_error(allow_disconnections) .start() .await?; @@ -221,6 +222,7 @@ async fn bootstrap_no_peers() -> anyhow::Result<()> { /// A node should be able to discover and connect a node connected to the seed node. #[tokio::test] async fn discovery_seed_single_peer() -> anyhow::Result<()> { + std::env::set_var("OPENMINA_DISCOVERY_FILTER_ADDR", false.to_string()); let mut cluster = ClusterBuilder::new() .ports_with_len(6) .idle_duration(Duration::from_millis(100)) @@ -254,15 +256,15 @@ async fn discovery_seed_single_peer() -> anyhow::Result<()> { #[tokio::test] async fn discovery_seed_multiple_peers() -> anyhow::Result<()> { + std::env::set_var("OPENMINA_DISCOVERY_FILTER_ADDR", false.to_string()); const PEERS: usize = 10; let mut cluster = ClusterBuilder::new() .ports_with_len(PEERS as u16 * 2 + 4) .idle_duration(Duration::from_millis(100)) + .is_error(allow_disconnections) .start() .await?; - std::env::set_var("OPENMINA_DISCOVERY_FILTER_ADDR", false.to_string()); - let [seed, nodes @ ..]: [_; PEERS + 1] = p2p_testing::utils::rust_nodes_from_config(&mut cluster, rust_config())?; let peer_ids = peer_ids(&cluster, nodes); From 267372ef028aad7e5d9af7bc4254ba9adbd6f2d0 Mon Sep 17 00:00:00 2001 From: Alexander Koptelov Date: Thu, 30 May 2024 15:16:36 +0300 Subject: [PATCH 2/5] task(p2p): report limit violation for identify --- .../p2p_network_identify_stream_effects.rs | 11 +++++++++- .../p2p_network_identify_stream_reducer.rs | 21 +++++++++--------- .../p2p_network_identify_stream_state.rs | 8 +++++-- .../stream/p2p_network_kad_stream_reducer.rs | 22 +++++++++---------- .../stream/p2p_network_kad_stream_state.rs | 20 ++++------------- p2p/src/network/mod.rs | 15 +++++++++++++ p2p/src/p2p_config.rs | 20 ++++++++++++++--- p2p/testing/Cargo.toml | 2 -- 8 files changed, 74 insertions(+), 45 deletions(-) diff --git a/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs b/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs index 0705876f33..6f72134af6 100644 --- a/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs +++ b/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs @@ -8,7 +8,11 @@ use super::{ super::{pb, P2pNetworkIdentify}, P2pNetworkIdentifyStreamAction, }; -use crate::{identify::P2pIdentifyAction, token, Data, P2pNetworkService, P2pNetworkYamuxAction}; +use crate::{ + disconnection::{P2pDisconnectionAction, P2pDisconnectionReason}, + identify::P2pIdentifyAction, + token, Data, P2pNetworkSchedulerAction, P2pNetworkService, P2pNetworkYamuxAction, +}; fn get_addrs(addr: &SocketAddr, net_svc: &mut S) -> I where @@ -172,6 +176,11 @@ impl P2pNetworkIdentifyStreamAction { } S::Error(err) => { warn!(meta.time(); summary = "error handling Identify action", error = err, action = format!("{self:?}")); + store.dispatch(P2pDisconnectionAction::Init { + peer_id, + reason: P2pDisconnectionReason::NetworkError(err.clone().into()), + }); + store.dispatch(P2pNetworkSchedulerAction::PruneStreams { peer_id }); Ok(()) } _ => unimplemented!(), diff --git a/p2p/src/network/identify/stream/p2p_network_identify_stream_reducer.rs b/p2p/src/network/identify/stream/p2p_network_identify_stream_reducer.rs index 4692672f3b..f88fc0f605 100644 --- a/p2p/src/network/identify/stream/p2p_network_identify_stream_reducer.rs +++ b/p2p/src/network/identify/stream/p2p_network_identify_stream_reducer.rs @@ -3,7 +3,7 @@ use super::{ }; use crate::{ network::identify::{pb::Identify, P2pNetworkIdentify}, - P2pLimits, + P2pLimits, P2pNetworkStreamProtobufError, }; use prost::Message; use quick_protobuf::BytesReader; @@ -42,13 +42,16 @@ impl P2pNetworkIdentifyStreamState { let data = &data.0; let mut reader = BytesReader::from_bytes(data); let Ok(len) = reader.read_varint32(data).map(|v| v as usize) else { - *self = S::Error("error reading message length".to_owned()); + *self = S::Error(P2pNetworkStreamProtobufError::MessageLength); return Ok(()); }; // TODO: implement as configuration option - if len > limits.identify_message { - *self = S::Error(format!("Identify message is too long ({})", len)); + if len > limits.identify_message() { + *self = S::Error(P2pNetworkStreamProtobufError::Limit( + len, + limits.identify_message(), + )); return Ok(()); } @@ -98,9 +101,9 @@ impl P2pNetworkIdentifyStreamState { let message = match Identify::decode(&data[..len]) { Ok(v) => v, Err(e) => { - *self = P2pNetworkIdentifyStreamState::Error(format!( - "error reading protobuf message: {e}" - )); + *self = P2pNetworkIdentifyStreamState::Error( + P2pNetworkStreamProtobufError::Message(e.into()), + ); return Ok(()); } }; @@ -108,9 +111,7 @@ impl P2pNetworkIdentifyStreamState { let data = match P2pNetworkIdentify::try_from(message.clone()) { Ok(v) => v, Err(e) => { - *self = P2pNetworkIdentifyStreamState::Error(format!( - "error converting protobuf message: {e}" - )); + *self = P2pNetworkIdentifyStreamState::Error(e.into()); return Ok(()); } }; diff --git a/p2p/src/network/identify/stream/p2p_network_identify_stream_state.rs b/p2p/src/network/identify/stream/p2p_network_identify_stream_state.rs index b3c6ce4a14..cb92cc8880 100644 --- a/p2p/src/network/identify/stream/p2p_network_identify_stream_state.rs +++ b/p2p/src/network/identify/stream/p2p_network_identify_stream_state.rs @@ -1,4 +1,4 @@ -use crate::network::identify::P2pNetworkIdentify; +use crate::{network::identify::P2pNetworkIdentify, P2pNetworkStreamProtobufError}; use serde::{Deserialize, Serialize}; #[derive(Clone, Copy, Debug, Serialize, Deserialize)] @@ -36,7 +36,7 @@ pub enum P2pNetworkIdentifyStreamState { data: P2pNetworkIdentify, }, /// Error handling the stream. - Error(String), + Error(P2pNetworkStreamProtobufError), } impl P2pNetworkIdentifyStreamState { @@ -50,3 +50,7 @@ impl From for P2pNetworkIdentifyStreamState { P2pNetworkIdentifyStreamState::IdentifyReceived { data } } } + +#[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] +#[error("identify stream: {0}")] +pub struct P2pNetworkIdentifyStreamError(#[from] P2pNetworkStreamProtobufError); diff --git a/p2p/src/network/kad/stream/p2p_network_kad_stream_reducer.rs b/p2p/src/network/kad/stream/p2p_network_kad_stream_reducer.rs index 661d381d25..7ce1693f1c 100644 --- a/p2p/src/network/kad/stream/p2p_network_kad_stream_reducer.rs +++ b/p2p/src/network/kad/stream/p2p_network_kad_stream_reducer.rs @@ -2,7 +2,7 @@ use quick_protobuf::{serialize_into_vec, BytesReader}; use redux::ActionWithMeta; use crate::{ - stream::P2pNetworkKadStreamError, P2pLimits, P2pNetworkKademliaRpcReply, + stream::P2pNetworkStreamProtobufError, P2pLimits, P2pNetworkKademliaRpcReply, P2pNetworkKademliaRpcRequest, }; @@ -34,15 +34,15 @@ impl P2pNetworkKadIncomingStreamState { let mut reader = BytesReader::from_bytes(data); let Ok(len) = reader.read_varint32(data).map(|v| v as usize) else { - *self = S::Error(P2pNetworkKadStreamError::MessageLength); + *self = S::Error(P2pNetworkStreamProtobufError::MessageLength); return Ok(()); }; println!("=== kademlia request len: {len}"); - if len > limits.kademlia_request { - *self = S::Error(P2pNetworkKadStreamError::Limit( + if len > limits.kademlia_request() { + *self = S::Error(P2pNetworkStreamProtobufError::Limit( len, - limits.kademlia_request, + limits.kademlia_request(), )); return Ok(()); } @@ -100,7 +100,7 @@ impl P2pNetworkKadIncomingStreamState { let message = match reader.read_message_by_len::(data, len) { Ok(v) => v, Err(e) => { - *self = Error(P2pNetworkKadStreamError::Message(e.to_string())); + *self = Error(P2pNetworkStreamProtobufError::Message(e.to_string())); return Ok(()); } }; @@ -151,15 +151,15 @@ impl P2pNetworkKadOutgoingStreamState { let mut reader = BytesReader::from_bytes(data); let Ok(len) = reader.read_varint32(data).map(|v| v as usize) else { - *self = S::Error(P2pNetworkKadStreamError::MessageLength); + *self = S::Error(P2pNetworkStreamProtobufError::MessageLength); return Ok(()); }; println!("=== kademlia response len: {len}"); - if len > limits.kademlia_response { - *self = S::Error(P2pNetworkKadStreamError::Limit( + if len > limits.kademlia_response() { + *self = S::Error(P2pNetworkStreamProtobufError::Limit( len, - limits.kademlia_response, + limits.kademlia_response(), )); return Ok(()); } @@ -211,7 +211,7 @@ impl P2pNetworkKadOutgoingStreamState { let message = match reader.read_message_by_len::(data, len) { Ok(v) => v, Err(e) => { - *self = Error(P2pNetworkKadStreamError::Message(e.to_string())); + *self = Error(P2pNetworkStreamProtobufError::Message(e.to_string())); return Ok(()); } }; diff --git a/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs b/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs index ec8a357382..d1f3ab350b 100644 --- a/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs +++ b/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs @@ -42,7 +42,7 @@ pub enum P2pNetworkKadIncomingStreamState { Closed, /// Error handling the stream. /// TODO: use enum for errors. - Error(P2pNetworkKadStreamError), + Error(P2pNetworkStreamProtobufError), } #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub enum P2pNetworkKadOutgoingStreamState { @@ -64,25 +64,13 @@ pub enum P2pNetworkKadOutgoingStreamState { Closed, /// Error handling the stream. /// TODO: use enum for errors. - Error(P2pNetworkKadStreamError), -} - -#[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] -pub enum P2pNetworkKadStreamError { - #[error("error reading message length")] - MessageLength, - #[error("message is too long: {0} exceeds {1}")] - Limit(usize, Limit), - #[error("error reading message: {0}")] - Message(String), - #[error("error converting protobuf message: {0}")] - Convert(#[from] P2pNetworkKademliaRpcFromMessageError), + Error(P2pNetworkStreamProtobufError), } #[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] #[error("kademlia incoming stream: {0}")] -pub struct P2pNetworkKadIncomingStreamError(#[from] P2pNetworkKadStreamError); +pub struct P2pNetworkKadIncomingStreamError(#[from] P2pNetworkStreamProtobufError); #[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] #[error("kademlia incoming stream: {0}")] -pub struct P2pNetworkKadOutgoingStreamError(#[from] P2pNetworkKadStreamError); +pub struct P2pNetworkKadOutgoingStreamError(#[from] P2pNetworkStreamProtobufError); diff --git a/p2p/src/network/mod.rs b/p2p/src/network/mod.rs index e27829c89a..39dc4f1530 100644 --- a/p2p/src/network/mod.rs +++ b/p2p/src/network/mod.rs @@ -172,7 +172,22 @@ pub enum P2pNetworkError { #[error("select error")] SelectError, #[error(transparent)] + IdentifyStreamError(#[from] P2pNetworkIdentifyStreamError), + #[error(transparent)] KademliaIncomingStreamError(#[from] P2pNetworkKadIncomingStreamError), #[error(transparent)] KademliaOutgoingStreamError(#[from] P2pNetworkKadOutgoingStreamError), } + +/// Errors that might happen while handling protobuf messages received via a stream. +#[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] +pub enum P2pNetworkStreamProtobufError { + #[error("error reading message length")] + MessageLength, + #[error("message is too long: {0} exceeds {1}")] + Limit(usize, Limit), + #[error("error reading message: {0}")] + Message(String), + #[error("error converting protobuf message: {0}")] + Convert(#[from] P2pNetworkKademliaRpcFromMessageError), +} diff --git a/p2p/src/p2p_config.rs b/p2p/src/p2p_config.rs index df21db45b4..93f76eb455 100644 --- a/p2p/src/p2p_config.rs +++ b/p2p/src/p2p_config.rs @@ -166,9 +166,23 @@ impls!(std::time::Duration); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct P2pLimits { - pub identify_message: Limit, - pub kademlia_request: Limit, - pub kademlia_response: Limit, + identify_message: Limit, + kademlia_request: Limit, + kademlia_response: Limit, +} + +macro_rules! getter { + ($name:ident) => { + pub fn $name(&self) -> Limit { + self.$name + } + }; +} + +impl P2pLimits { + getter!(identify_message); + getter!(kademlia_request); + getter!(kademlia_response); } impl Default for P2pLimits { diff --git a/p2p/testing/Cargo.toml b/p2p/testing/Cargo.toml index b701832cc2..e273d04197 100644 --- a/p2p/testing/Cargo.toml +++ b/p2p/testing/Cargo.toml @@ -17,8 +17,6 @@ futures = "0.3.30" rand = "0.8.5" derive_more = "0.99.17" thiserror = "1.0.57" -derive-getters = "0.3.0" -derive_builder = "0.20.0" openmina-core = { path = "../../core" } pin-project-lite = "0.2" lazy_static = "1.4.0" From 4cb86c07a221a2032a36148d8ed71e34d24ef07e Mon Sep 17 00:00:00 2001 From: Alexander Koptelov Date: Thu, 30 May 2024 16:32:26 +0300 Subject: [PATCH 3/5] task(p2p): fix connections limit --- Cargo.lock | 50 +---------- cli/src/commands/node/mod.rs | 3 +- node/testing/src/cluster/mod.rs | 3 +- .../identify/p2p_network_identify_protocol.rs | 2 +- .../p2p_network_identify_stream_effects.rs | 10 ++- .../p2p_network_identify_stream_reducer.rs | 2 +- .../p2p_network_identify_stream_state.rs | 6 +- .../stream/p2p_network_kad_stream_reducer.rs | 4 +- .../stream/p2p_network_kad_stream_state.rs | 15 ++-- p2p/src/network/mod.rs | 7 +- .../p2p_network_scheduler_effects.rs | 5 +- p2p/src/p2p_config.rs | 83 ++++++++++++++++--- p2p/src/p2p_state.rs | 14 ++-- p2p/testing/src/cluster.rs | 8 +- 14 files changed, 120 insertions(+), 92 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9fa386ab1d..9ef65b515f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -180,7 +180,7 @@ version = "0.1.1" source = "git+https://github.com/openmina/alloc-test.git#0d3951eaddc58a6ea2d2134966ed5954234889d3" dependencies = [ "clap 4.5.2", - "derive_builder 0.11.2", + "derive_builder", "derive_more", "num", "serde", @@ -1586,33 +1586,13 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "derive-getters" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2c35ab6e03642397cdda1dd58abbc05d418aef8e36297f336d5aba060fe8df" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "derive_builder" version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d07adf7be193b71cc36b193d0f5fe60b918a3a9db4dad0449f57bcfd519704a3" dependencies = [ - "derive_builder_macro 0.11.2", -] - -[[package]] -name = "derive_builder" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0350b5cb0331628a5916d6c5c0b72e97393b8b6b03b47a9284f4e7f5a405ffd7" -dependencies = [ - "derive_builder_macro 0.20.0", + "derive_builder_macro", ] [[package]] @@ -1627,38 +1607,16 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "derive_builder_core" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d48cda787f839151732d396ac69e3473923d54312c070ee21e9effcaa8ca0b1d" -dependencies = [ - "darling 0.20.6", - "proc-macro2", - "quote", - "syn 2.0.58", -] - [[package]] name = "derive_builder_macro" version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f0314b72bed045f3a68671b3c86328386762c93f82d98c65c3cb5e5f573dd68" dependencies = [ - "derive_builder_core 0.11.2", + "derive_builder_core", "syn 1.0.109", ] -[[package]] -name = "derive_builder_macro" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "206868b8242f27cecce124c19fd88157fbd0dd334df2587f36417bafbc85097b" -dependencies = [ - "derive_builder_core 0.20.0", - "syn 2.0.58", -] - [[package]] name = "derive_more" version = "0.99.17" @@ -4847,8 +4805,6 @@ dependencies = [ name = "p2p-testing" version = "0.4.0" dependencies = [ - "derive-getters", - "derive_builder 0.20.0", "derive_more", "futures", "getrandom 0.2.14", diff --git a/cli/src/commands/node/mod.rs b/cli/src/commands/node/mod.rs index 08ca356505..0ba5ffd882 100644 --- a/cli/src/commands/node/mod.rs +++ b/cli/src/commands/node/mod.rs @@ -216,7 +216,6 @@ impl Node { listen_port: self.port, identity_pub_key: pub_key, initial_peers: self.peers, - max_peers: 100, ask_initial_peers_interval: Duration::from_secs(3600), enabled_channels: ChannelId::for_libp2p().collect(), peer_discovery: !self.no_peers_discovery, @@ -224,7 +223,7 @@ impl Node { .duration_since(SystemTime::UNIX_EPOCH) .expect("linear time"), timeouts: P2pTimeouts::default(), - limits: P2pLimits::default(), + limits: P2pLimits::default().with_max_peers(Some(100)), }, transition_frontier, block_producer: block_producer.clone().map(|(config, _)| config), diff --git a/node/testing/src/cluster/mod.rs b/node/testing/src/cluster/mod.rs index 388a78be6a..34a27e2755 100644 --- a/node/testing/src/cluster/mod.rs +++ b/node/testing/src/cluster/mod.rs @@ -278,12 +278,11 @@ impl Cluster { listen_port: http_port, identity_pub_key: pub_key, initial_peers, - max_peers: testing_config.max_peers, ask_initial_peers_interval: testing_config.ask_initial_peers_interval, enabled_channels: ChannelId::iter_all().collect(), peer_discovery: true, timeouts: testing_config.timeouts, - limits: P2pLimits::default(), + limits: P2pLimits::default().with_max_peers(Some(testing_config.max_peers)), initial_time: testing_config .initial_time .checked_sub(redux::Timestamp::ZERO) diff --git a/p2p/src/network/identify/p2p_network_identify_protocol.rs b/p2p/src/network/identify/p2p_network_identify_protocol.rs index 4314d64455..5c437b708e 100644 --- a/p2p/src/network/identify/p2p_network_identify_protocol.rs +++ b/p2p/src/network/identify/p2p_network_identify_protocol.rs @@ -17,7 +17,7 @@ pub struct P2pNetworkIdentify { pub protocols: Vec, } -#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, thiserror::Error)] pub enum P2pNetworkIdentifyFromMessageError { #[error("cant parse protocol: {0}")] UnsupportedProtocol(String), diff --git a/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs b/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs index 6f72134af6..ec8a881827 100644 --- a/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs +++ b/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs @@ -11,7 +11,9 @@ use super::{ use crate::{ disconnection::{P2pDisconnectionAction, P2pDisconnectionReason}, identify::P2pIdentifyAction, - token, Data, P2pNetworkSchedulerAction, P2pNetworkService, P2pNetworkYamuxAction, + network::identify::stream::P2pNetworkIdentifyStreamError, + token, Data, P2pNetworkSchedulerAction, P2pNetworkService, + P2pNetworkYamuxAction, }; fn get_addrs(addr: &SocketAddr, net_svc: &mut S) -> I @@ -175,10 +177,12 @@ impl P2pNetworkIdentifyStreamAction { Ok(()) } S::Error(err) => { - warn!(meta.time(); summary = "error handling Identify action", error = err, action = format!("{self:?}")); + warn!(meta.time(); summary = "error handling Identify action", error = display(err)); store.dispatch(P2pDisconnectionAction::Init { peer_id, - reason: P2pDisconnectionReason::NetworkError(err.clone().into()), + reason: P2pDisconnectionReason::NetworkError( + P2pNetworkIdentifyStreamError::from(err.clone()).into(), + ), }); store.dispatch(P2pNetworkSchedulerAction::PruneStreams { peer_id }); Ok(()) diff --git a/p2p/src/network/identify/stream/p2p_network_identify_stream_reducer.rs b/p2p/src/network/identify/stream/p2p_network_identify_stream_reducer.rs index f88fc0f605..62b175098f 100644 --- a/p2p/src/network/identify/stream/p2p_network_identify_stream_reducer.rs +++ b/p2p/src/network/identify/stream/p2p_network_identify_stream_reducer.rs @@ -102,7 +102,7 @@ impl P2pNetworkIdentifyStreamState { Ok(v) => v, Err(e) => { *self = P2pNetworkIdentifyStreamState::Error( - P2pNetworkStreamProtobufError::Message(e.into()), + P2pNetworkStreamProtobufError::Message(e.to_string()), ); return Ok(()); } diff --git a/p2p/src/network/identify/stream/p2p_network_identify_stream_state.rs b/p2p/src/network/identify/stream/p2p_network_identify_stream_state.rs index cb92cc8880..de02002084 100644 --- a/p2p/src/network/identify/stream/p2p_network_identify_stream_state.rs +++ b/p2p/src/network/identify/stream/p2p_network_identify_stream_state.rs @@ -1,4 +1,4 @@ -use crate::{network::identify::P2pNetworkIdentify, P2pNetworkStreamProtobufError}; +use crate::{network::identify::{P2pNetworkIdentify, P2pNetworkIdentifyFromMessageError}, P2pNetworkStreamProtobufError}; use serde::{Deserialize, Serialize}; #[derive(Clone, Copy, Debug, Serialize, Deserialize)] @@ -36,7 +36,7 @@ pub enum P2pNetworkIdentifyStreamState { data: P2pNetworkIdentify, }, /// Error handling the stream. - Error(P2pNetworkStreamProtobufError), + Error(P2pNetworkStreamProtobufError), } impl P2pNetworkIdentifyStreamState { @@ -53,4 +53,4 @@ impl From for P2pNetworkIdentifyStreamState { #[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] #[error("identify stream: {0}")] -pub struct P2pNetworkIdentifyStreamError(#[from] P2pNetworkStreamProtobufError); +pub struct P2pNetworkIdentifyStreamError(#[from] P2pNetworkStreamProtobufError); diff --git a/p2p/src/network/kad/stream/p2p_network_kad_stream_reducer.rs b/p2p/src/network/kad/stream/p2p_network_kad_stream_reducer.rs index 7ce1693f1c..33acbb7eef 100644 --- a/p2p/src/network/kad/stream/p2p_network_kad_stream_reducer.rs +++ b/p2p/src/network/kad/stream/p2p_network_kad_stream_reducer.rs @@ -2,8 +2,8 @@ use quick_protobuf::{serialize_into_vec, BytesReader}; use redux::ActionWithMeta; use crate::{ - stream::P2pNetworkStreamProtobufError, P2pLimits, P2pNetworkKademliaRpcReply, - P2pNetworkKademliaRpcRequest, + P2pLimits, P2pNetworkKademliaRpcReply, P2pNetworkKademliaRpcRequest, + P2pNetworkStreamProtobufError, }; use super::{ diff --git a/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs b/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs index d1f3ab350b..f61c3bd3fa 100644 --- a/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs +++ b/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs @@ -1,8 +1,7 @@ use serde::{Deserialize, Serialize}; use crate::{ - Limit, P2pNetworkKademliaRpcFromMessageError, P2pNetworkKademliaRpcReply, - P2pNetworkKademliaRpcRequest, + P2pNetworkKademliaRpcFromMessageError, P2pNetworkKademliaRpcReply, P2pNetworkKademliaRpcRequest, P2pNetworkStreamProtobufError }; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -42,7 +41,7 @@ pub enum P2pNetworkKadIncomingStreamState { Closed, /// Error handling the stream. /// TODO: use enum for errors. - Error(P2pNetworkStreamProtobufError), + Error(P2pNetworkStreamProtobufError), } #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub enum P2pNetworkKadOutgoingStreamState { @@ -64,13 +63,17 @@ pub enum P2pNetworkKadOutgoingStreamState { Closed, /// Error handling the stream. /// TODO: use enum for errors. - Error(P2pNetworkStreamProtobufError), + Error(P2pNetworkStreamProtobufError), } #[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] #[error("kademlia incoming stream: {0}")] -pub struct P2pNetworkKadIncomingStreamError(#[from] P2pNetworkStreamProtobufError); +pub struct P2pNetworkKadIncomingStreamError( + #[from] P2pNetworkStreamProtobufError, +); #[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] #[error("kademlia incoming stream: {0}")] -pub struct P2pNetworkKadOutgoingStreamError(#[from] P2pNetworkStreamProtobufError); +pub struct P2pNetworkKadOutgoingStreamError( + #[from] P2pNetworkStreamProtobufError, +); diff --git a/p2p/src/network/mod.rs b/p2p/src/network/mod.rs index 39dc4f1530..ffa1f7e3b6 100644 --- a/p2p/src/network/mod.rs +++ b/p2p/src/network/mod.rs @@ -2,6 +2,9 @@ mod p2p_network_actions; use serde::Deserialize; use serde::Serialize; +use crate::Limit; + +use self::identify::stream::P2pNetworkIdentifyStreamError; pub use self::p2p_network_actions::*; mod p2p_network_service; @@ -181,7 +184,7 @@ pub enum P2pNetworkError { /// Errors that might happen while handling protobuf messages received via a stream. #[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] -pub enum P2pNetworkStreamProtobufError { +pub enum P2pNetworkStreamProtobufError { #[error("error reading message length")] MessageLength, #[error("message is too long: {0} exceeds {1}")] @@ -189,5 +192,5 @@ pub enum P2pNetworkStreamProtobufError { #[error("error reading message: {0}")] Message(String), #[error("error converting protobuf message: {0}")] - Convert(#[from] P2pNetworkKademliaRpcFromMessageError), + Convert(#[from] T), } diff --git a/p2p/src/network/scheduler/p2p_network_scheduler_effects.rs b/p2p/src/network/scheduler/p2p_network_scheduler_effects.rs index 0767870996..5b3fe0b357 100644 --- a/p2p/src/network/scheduler/p2p_network_scheduler_effects.rs +++ b/p2p/src/network/scheduler/p2p_network_scheduler_effects.rs @@ -41,7 +41,10 @@ impl P2pNetworkSchedulerAction { // TODO: handle this error? } Self::IncomingConnectionIsReady { listener, .. } => { - if store.state().already_has_max_peers() { + let state = store.state(); + if state.network.scheduler.connections.len() + >= state.config.limits.max_connections() + { store.service().send_mio_cmd(MioCmd::Refuse(listener)); } else { store.service().send_mio_cmd(MioCmd::Accept(listener)); diff --git a/p2p/src/p2p_config.rs b/p2p/src/p2p_config.rs index 93f76eb455..966123c162 100644 --- a/p2p/src/p2p_config.rs +++ b/p2p/src/p2p_config.rs @@ -23,9 +23,6 @@ pub struct P2pConfig { pub enabled_channels: BTreeSet, - /// Maximal allowed number of connections. - pub max_peers: usize, - pub timeouts: P2pTimeouts, pub limits: P2pLimits, @@ -111,6 +108,25 @@ impl Limit { macro_rules! impls { ($ty:ty) => { + + impl From> for Limit<$ty> { + fn from(value: Option<$ty>) -> Self { + match value { + Some(v) => Limit::Some(v), + None => Limit::Unlimited, + } + } + } + + impl From> for Option<$ty> { + fn from(value: Limit<$ty>) -> Self { + match value { + Limit::Some(v) => Some(v), + Limit::Unlimited => None, + } + } + } + impl std::cmp::PartialEq<$ty> for Limit<$ty> { fn eq(&self, other: &$ty) -> bool { match self { @@ -166,31 +182,78 @@ impls!(std::time::Duration); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct P2pLimits { + max_peers: Limit, identify_message: Limit, kademlia_request: Limit, kademlia_response: Limit, } -macro_rules! getter { - ($name:ident) => { - pub fn $name(&self) -> Limit { - self.$name +macro_rules! limit { + (#[$meta:meta] $limit:ident) => { + #[$meta] + pub fn $limit(&self) -> Limit { + self.$limit + } + }; + + (#[$meta:meta] $limit:ident, #[$setter_meta:meta] $setter:ident) => { + limit!(#[$meta] $limit); + + #[$setter_meta] + pub fn $setter>>(mut self, $limit: T) -> Self { + self.$limit = $limit.into(); + self + } + }; + + (#[$meta:meta] $limit:ident(&$self:ident): $expr:expr) => { + #[$meta] + pub fn $limit(&$self) -> Limit { + $expr } }; } impl P2pLimits { - getter!(identify_message); - getter!(kademlia_request); - getter!(kademlia_response); + limit!( + /// Maximum number of peers. + max_peers, + /// Sets maximum number of peers. + with_max_peers + ); + + limit!( + /// Minimum number of peers. + min_peers(&self): self.max_peers.map(|v| (v / 2).max(3).min(v)) + ); + + limit!( + /// Maximum number of connections. + max_connections(&self): self.max_peers.map(|v| v + 10) + ); + + limit!( + /// Maximum length of Identify message. + identify_message + ); + limit!( + /// Maximum length of Kademlia request message. + kademlia_request + ); + limit!( + /// Maximum length of Kademlia response message. + kademlia_response + ); } impl Default for P2pLimits { fn default() -> Self { + let max_peers = Limit::Some(100); let identify_message = Limit::Some(0x1000); let kademlia_request = Limit::Some(50); let kademlia_response = identify_message.map(|v| v * 20); // should be enough to fit 20 addresses supplied by identify Self { + max_peers, identify_message, kademlia_request, kademlia_response, diff --git a/p2p/src/p2p_state.rs b/p2p/src/p2p_state.rs index ec6e012825..e108974b05 100644 --- a/p2p/src/p2p_state.rs +++ b/p2p/src/p2p_state.rs @@ -1,5 +1,5 @@ use multiaddr::multiaddr; -use openmina_core::{block::ArcBlockWithHash, ChainId}; +use openmina_core::{ChainId, block::ArcBlockWithHash}; use redux::Timestamp; use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, BTreeSet}; @@ -12,7 +12,7 @@ use crate::connection::incoming::P2pConnectionIncomingState; use crate::connection::outgoing::{P2pConnectionOutgoingInitOpts, P2pConnectionOutgoingState}; use crate::network::identify::P2pNetworkIdentify; use crate::network::P2pNetworkState; -use crate::{is_time_passed, P2pTimeouts, PeerId}; +use crate::{is_time_passed, Limit, P2pTimeouts, PeerId}; use super::connection::P2pConnectionState; use super::P2pConfig; @@ -195,21 +195,21 @@ impl P2pState { } pub fn already_has_min_peers(&self) -> bool { - self.connected_or_connecting_peers_count() >= self.min_peers() + self.connected_or_connecting_peers_count() >= self.config.limits.min_peers() } pub fn already_has_max_peers(&self) -> bool { - self.connected_or_connecting_peers_count() >= self.config.max_peers + self.connected_or_connecting_peers_count() >= self.config.limits.max_peers() } /// The peers capacity is exceeded. pub fn already_has_max_ready_peers(&self) -> bool { - self.ready_peers_iter().count() >= self.config.max_peers + self.ready_peers_iter().count() >= self.config.limits.max_peers() } /// Minimal number of peers that the node should connect - pub fn min_peers(&self) -> usize { - (self.config.max_peers / 2).max(3) + pub fn min_peers(&self) -> Limit { + self.config.limits.min_peers() } /// Peer with libp2p connection identified by `conn_id`. diff --git a/p2p/testing/src/cluster.rs b/p2p/testing/src/cluster.rs index 8d5cc2219e..eaf5173dc8 100644 --- a/p2p/testing/src/cluster.rs +++ b/p2p/testing/src/cluster.rs @@ -6,7 +6,6 @@ use std::{ time::{Duration, Instant}, }; -use derive_builder::UninitializedFieldError; use futures::StreamExt; use libp2p::{multiaddr::multiaddr, swarm::DialError, Multiaddr}; use openmina_core::{ChainId, BERKELEY_CHAIN_ID}; @@ -158,7 +157,7 @@ impl ClusterBuilder { let chain_id = self.chain_id; let ports = self .ports - .ok_or_else(|| UninitializedFieldError::new("ports"))? + .ok_or_else(|| Error::UninitializedField("ports"))? .ports() .await?; let ip = self.ip; @@ -267,8 +266,8 @@ pub enum Error { NoMorePorts, #[error(transparent)] AddrParse(#[from] P2pConnectionOutgoingInitOptsParseError), - #[error(transparent)] - UninitializedField(#[from] UninitializedFieldError), + #[error("uninitialized field `{0}`")] + UninitializedField(&'static str), #[error("swarm creation error: {0}")] Libp2pSwarm(String), #[error(transparent)] @@ -357,7 +356,6 @@ impl Cluster { initial_peers, ask_initial_peers_interval: Duration::from_secs(5), enabled_channels: p2p::channels::ChannelId::for_libp2p().collect(), - max_peers: 100, peer_discovery: config.discovery, timeouts: config.timeouts, limits: config.limits, From a26f46ff5370ed9aa0640ad624a53c246641de44 Mon Sep 17 00:00:00 2001 From: Alexander Koptelov Date: Thu, 30 May 2024 22:47:12 +0300 Subject: [PATCH 4/5] task(p2p): add limits for RPCs --- .../p2p_network_identify_stream_effects.rs | 3 +- .../p2p_network_identify_stream_state.rs | 9 ++- .../stream/p2p_network_kad_stream_state.rs | 3 +- p2p/src/network/p2p_network_reducer.rs | 2 +- .../network/rpc/p2p_network_rpc_reducer.rs | 55 +++++++++++++++++- p2p/src/network/rpc/p2p_network_rpc_state.rs | 12 +++- p2p/src/p2p_config.rs | 57 ++++++++++++++++++- p2p/src/p2p_state.rs | 2 +- 8 files changed, 130 insertions(+), 13 deletions(-) diff --git a/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs b/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs index ec8a881827..911fd89242 100644 --- a/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs +++ b/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs @@ -12,8 +12,7 @@ use crate::{ disconnection::{P2pDisconnectionAction, P2pDisconnectionReason}, identify::P2pIdentifyAction, network::identify::stream::P2pNetworkIdentifyStreamError, - token, Data, P2pNetworkSchedulerAction, P2pNetworkService, - P2pNetworkYamuxAction, + token, Data, P2pNetworkSchedulerAction, P2pNetworkService, P2pNetworkYamuxAction, }; fn get_addrs(addr: &SocketAddr, net_svc: &mut S) -> I diff --git a/p2p/src/network/identify/stream/p2p_network_identify_stream_state.rs b/p2p/src/network/identify/stream/p2p_network_identify_stream_state.rs index de02002084..7a4a2079ea 100644 --- a/p2p/src/network/identify/stream/p2p_network_identify_stream_state.rs +++ b/p2p/src/network/identify/stream/p2p_network_identify_stream_state.rs @@ -1,4 +1,7 @@ -use crate::{network::identify::{P2pNetworkIdentify, P2pNetworkIdentifyFromMessageError}, P2pNetworkStreamProtobufError}; +use crate::{ + network::identify::{P2pNetworkIdentify, P2pNetworkIdentifyFromMessageError}, + P2pNetworkStreamProtobufError, +}; use serde::{Deserialize, Serialize}; #[derive(Clone, Copy, Debug, Serialize, Deserialize)] @@ -53,4 +56,6 @@ impl From for P2pNetworkIdentifyStreamState { #[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] #[error("identify stream: {0}")] -pub struct P2pNetworkIdentifyStreamError(#[from] P2pNetworkStreamProtobufError); +pub struct P2pNetworkIdentifyStreamError( + #[from] P2pNetworkStreamProtobufError, +); diff --git a/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs b/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs index f61c3bd3fa..e14fc25303 100644 --- a/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs +++ b/p2p/src/network/kad/stream/p2p_network_kad_stream_state.rs @@ -1,7 +1,8 @@ use serde::{Deserialize, Serialize}; use crate::{ - P2pNetworkKademliaRpcFromMessageError, P2pNetworkKademliaRpcReply, P2pNetworkKademliaRpcRequest, P2pNetworkStreamProtobufError + P2pNetworkKademliaRpcFromMessageError, P2pNetworkKademliaRpcReply, + P2pNetworkKademliaRpcRequest, P2pNetworkStreamProtobufError, }; #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/p2p/src/network/p2p_network_reducer.rs b/p2p/src/network/p2p_network_reducer.rs index 4964be088c..dd5efc0a13 100644 --- a/p2p/src/network/p2p_network_reducer.rs +++ b/p2p/src/network/p2p_network_reducer.rs @@ -116,7 +116,7 @@ impl P2pNetworkState { } P2pNetworkAction::Rpc(a) => { if let Some(state) = self.find_rpc_state_mut(a) { - state.reducer(meta.with_action(a)) + state.reducer(meta.with_action(a), limits) } } } diff --git a/p2p/src/network/rpc/p2p_network_rpc_reducer.rs b/p2p/src/network/rpc/p2p_network_rpc_reducer.rs index 8aa2de8fc8..b721ae0880 100644 --- a/p2p/src/network/rpc/p2p_network_rpc_reducer.rs +++ b/p2p/src/network/rpc/p2p_network_rpc_reducer.rs @@ -1,10 +1,18 @@ use binprot::BinProtRead; -use mina_p2p_messages::rpc_kernel::{MessageHeader, QueryHeader}; +use mina_p2p_messages::rpc_kernel::{MessageHeader, QueryHeader, RpcMethod}; + +use crate::{Limit, P2pLimits}; + +use self::p2p_network_rpc_state::P2pNetworkRpcError; use super::*; impl P2pNetworkRpcState { - pub fn reducer(&mut self, action: redux::ActionWithMeta<&P2pNetworkRpcAction>) { + pub fn reducer( + &mut self, + action: redux::ActionWithMeta<&P2pNetworkRpcAction>, + limits: &P2pLimits, + ) { if self.error.is_some() { return; } @@ -20,6 +28,10 @@ impl P2pNetworkRpcState { let buf = &self.buffer[offset..]; if let Some(len_bytes) = buf.get(..8).and_then(|s| s.try_into().ok()) { let len = u64::from_le_bytes(len_bytes) as usize; + if let Err(err) = self.check_rpc_limit(len, limits) { + self.error = Some(err); + return; + } if buf.len() >= 8 + len { offset += 8 + len; let mut slice = &buf[8..(8 + len)]; @@ -39,7 +51,7 @@ impl P2pNetworkRpcState { bytes: slice.to_vec().into(), }, Err(err) => { - self.error = Some(err.to_string()); + self.error = Some(P2pNetworkRpcError::Binprot(err.to_string())); continue; } }; @@ -85,4 +97,41 @@ impl P2pNetworkRpcState { _ => {} } } + + fn check_rpc_limit(&self, len: usize, limits: &P2pLimits) -> Result<(), P2pNetworkRpcError> { + let (limit, kind): (_, &[u8]) = if self.is_incoming { + // only requests are allowed + (limits.rpc_query(), b"") + } else if let Some(QueryHeader { tag, .. }) = self.pending.as_ref() { + use mina_p2p_messages::rpc::*; + match tag.as_ref() { + GetBestTipV2::NAME => (limits.rpc_get_best_tip(), GetBestTipV2::NAME), + AnswerSyncLedgerQueryV2::NAME => ( + limits.rpc_answer_sync_ledger_query(), + AnswerSyncLedgerQueryV2::NAME, + ), + GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::NAME => ( + limits.rpc_get_staged_ledger(), + GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::NAME, + ), + GetTransitionChainV2::NAME => ( + limits.rpc_get_transition_chain(), + GetTransitionChainV2::NAME, + ), + GetSomeInitialPeersV1ForV2::NAME => ( + limits.rpc_get_some_initial_peers(), + GetSomeInitialPeersV1ForV2::NAME, + ), + _ => (Limit::Some(0), b""), + } + } else { + (limits.rpc_service_message(), b"") + }; + let kind = String::from_utf8_lossy(kind); + if len > limit { + Err(P2pNetworkRpcError::Limit(kind.into_owned(), len, limit)) + } else { + Ok(()) + } + } } diff --git a/p2p/src/network/rpc/p2p_network_rpc_state.rs b/p2p/src/network/rpc/p2p_network_rpc_state.rs index 1e795149a8..f0e8d563d0 100644 --- a/p2p/src/network/rpc/p2p_network_rpc_state.rs +++ b/p2p/src/network/rpc/p2p_network_rpc_state.rs @@ -29,7 +29,7 @@ pub struct P2pNetworkRpcState { pub is_incoming: bool, pub buffer: Vec, pub incoming: VecDeque, - pub error: Option, + pub error: Option, } impl P2pNetworkRpcState { @@ -74,7 +74,7 @@ impl RpcMessage { .unwrap_or_default(); } Self::Query { header, bytes } => { - MessageHeader::Query(header) + MessageHeader::Query(header.clone()) .binprot_write(&mut v) .unwrap_or_default(); v.extend_from_slice(&bytes); @@ -92,3 +92,11 @@ impl RpcMessage { v } } + +#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)] +pub enum P2pNetworkRpcError { + #[error("error reading binprot message: {0}")] + Binprot(String), + #[error("message {0} with size {1} exceeds limit of {2}")] + Limit(String, usize, Limit), +} diff --git a/p2p/src/p2p_config.rs b/p2p/src/p2p_config.rs index 966123c162..2633828638 100644 --- a/p2p/src/p2p_config.rs +++ b/p2p/src/p2p_config.rs @@ -108,7 +108,6 @@ impl Limit { macro_rules! impls { ($ty:ty) => { - impl From> for Limit<$ty> { fn from(value: Option<$ty>) -> Self { match value { @@ -183,9 +182,18 @@ impls!(std::time::Duration); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct P2pLimits { max_peers: Limit, + identify_message: Limit, kademlia_request: Limit, kademlia_response: Limit, + + rpc_service_message: Limit, + rpc_query: Limit, + rpc_get_best_tip: Limit, + rpc_answer_sync_ledger_query: Limit, + rpc_get_staged_ledger: Limit, + rpc_get_transition_chain: Limit, + rpc_get_some_initial_peers: Limit, } macro_rules! limit { @@ -244,19 +252,66 @@ impl P2pLimits { /// Maximum length of Kademlia response message. kademlia_response ); + + limit!( + #[doc = "RPC service message"] + rpc_service_message + ); + limit!( + #[doc = "RPC query"] + rpc_query + ); + limit!( + #[doc = "RPC get_best_tip"] + rpc_get_best_tip + ); + limit!( + #[doc = "RPC answer_sync_ledger_query"] + rpc_answer_sync_ledger_query + ); + limit!( + #[doc = "RPC get_staged_ledger"] + rpc_get_staged_ledger + ); + limit!( + #[doc = "RPC get_transition_chain"] + rpc_get_transition_chain + ); + limit!( + #[doc = "RPC some_initial_peers"] + rpc_get_some_initial_peers + ); } impl Default for P2pLimits { fn default() -> Self { let max_peers = Limit::Some(100); + let identify_message = Limit::Some(0x1000); let kademlia_request = Limit::Some(50); let kademlia_response = identify_message.map(|v| v * 20); // should be enough to fit 20 addresses supplied by identify + + let rpc_service_message = Limit::Some(7); // 7 for handshake, 1 for heartbeat + let rpc_query = Limit::Some(256); // max is 96 + let rpc_get_best_tip = Limit::Some(3_500_000); // 3182930 as observed, may vary + let rpc_answer_sync_ledger_query = Limit::Some(200_000); // 124823 as observed + let rpc_get_staged_ledger = Limit::Some(40_000_000); // 36371615 as observed, may vary + let rpc_get_transition_chain = Limit::Some(3_500_000); // 2979112 as observed + let rpc_get_some_initial_peers = Limit::Some(32_000); // TODO: calculate Self { max_peers, + identify_message, kademlia_request, kademlia_response, + + rpc_service_message, + rpc_query, + rpc_get_best_tip, + rpc_answer_sync_ledger_query, + rpc_get_staged_ledger, + rpc_get_transition_chain, + rpc_get_some_initial_peers, } } } diff --git a/p2p/src/p2p_state.rs b/p2p/src/p2p_state.rs index e108974b05..42c5982663 100644 --- a/p2p/src/p2p_state.rs +++ b/p2p/src/p2p_state.rs @@ -1,5 +1,5 @@ use multiaddr::multiaddr; -use openmina_core::{ChainId, block::ArcBlockWithHash}; +use openmina_core::{block::ArcBlockWithHash, ChainId}; use redux::Timestamp; use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, BTreeSet}; From 4252774e9b40f37b7ec5d4b0f2b2d2fb1e248637 Mon Sep 17 00:00:00 2001 From: Alexander Koptelov Date: Fri, 31 May 2024 18:04:13 +0300 Subject: [PATCH 5/5] task(p2p): refactor p2p-layer errors --- p2p/src/disconnection/mod.rs | 5 +---- .../stream/p2p_network_identify_stream_effects.rs | 15 +++++---------- .../kad/stream/p2p_network_kad_stream_effects.rs | 9 +++------ p2p/src/network/mod.rs | 12 ------------ .../scheduler/p2p_network_scheduler_effects.rs | 10 ++-------- .../scheduler/p2p_network_scheduler_reducer.rs | 15 +++++---------- .../scheduler/p2p_network_scheduler_state.rs | 10 +++++++++- 7 files changed, 25 insertions(+), 51 deletions(-) diff --git a/p2p/src/disconnection/mod.rs b/p2p/src/disconnection/mod.rs index 8cb9de165a..ad399b4d77 100644 --- a/p2p/src/disconnection/mod.rs +++ b/p2p/src/disconnection/mod.rs @@ -11,7 +11,7 @@ pub use p2p_disconnection_service::*; use serde::{Deserialize, Serialize}; -use crate::{channels::ChannelId, connection::RejectionReason, P2pNetworkError}; +use crate::{channels::ChannelId, connection::RejectionReason}; #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, thiserror::Error)] pub enum P2pDisconnectionReason { @@ -37,7 +37,4 @@ pub enum P2pDisconnectionReason { #[error("timeout")] Timeout, - - #[error("network error: {0}")] - NetworkError(#[from] P2pNetworkError), } diff --git a/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs b/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs index 911fd89242..a1885940fb 100644 --- a/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs +++ b/p2p/src/network/identify/stream/p2p_network_identify_stream_effects.rs @@ -9,10 +9,8 @@ use super::{ P2pNetworkIdentifyStreamAction, }; use crate::{ - disconnection::{P2pDisconnectionAction, P2pDisconnectionReason}, - identify::P2pIdentifyAction, - network::identify::stream::P2pNetworkIdentifyStreamError, - token, Data, P2pNetworkSchedulerAction, P2pNetworkService, P2pNetworkYamuxAction, + identify::P2pIdentifyAction, network::identify::stream::P2pNetworkIdentifyStreamError, token, + Data, P2pNetworkSchedulerAction, P2pNetworkService, P2pNetworkYamuxAction, }; fn get_addrs(addr: &SocketAddr, net_svc: &mut S) -> I @@ -177,13 +175,10 @@ impl P2pNetworkIdentifyStreamAction { } S::Error(err) => { warn!(meta.time(); summary = "error handling Identify action", error = display(err)); - store.dispatch(P2pDisconnectionAction::Init { - peer_id, - reason: P2pDisconnectionReason::NetworkError( - P2pNetworkIdentifyStreamError::from(err.clone()).into(), - ), + store.dispatch(P2pNetworkSchedulerAction::Error { + addr, + error: P2pNetworkIdentifyStreamError::from(err.clone()).into(), }); - store.dispatch(P2pNetworkSchedulerAction::PruneStreams { peer_id }); Ok(()) } _ => unimplemented!(), diff --git a/p2p/src/network/kad/stream/p2p_network_kad_stream_effects.rs b/p2p/src/network/kad/stream/p2p_network_kad_stream_effects.rs index caf557caba..7012c0c4d1 100644 --- a/p2p/src/network/kad/stream/p2p_network_kad_stream_effects.rs +++ b/p2p/src/network/kad/stream/p2p_network_kad_stream_effects.rs @@ -2,7 +2,6 @@ use openmina_core::warn; use redux::ActionMeta; use crate::{ - disconnection::{P2pDisconnectionAction, P2pDisconnectionReason}, stream::{P2pNetworkKadIncomingStreamError, P2pNetworkKadOutgoingStreamError}, Data, P2pNetworkKademliaAction, P2pNetworkSchedulerAction, P2pNetworkYamuxAction, }; @@ -188,12 +187,10 @@ impl P2pNetworkKademliaStreamAction { D::Incoming(_) => P2pNetworkKadIncomingStreamError::from(err.clone()).into(), D::Outgoing(_) => P2pNetworkKadOutgoingStreamError::from(err.clone()).into(), }; - let peer_id = *action.peer_id(); - store.dispatch(P2pDisconnectionAction::Init { - peer_id, - reason: P2pDisconnectionReason::NetworkError(error), + store.dispatch(P2pNetworkSchedulerAction::Error { + addr: *action.addr(), + error, }); - store.dispatch(P2pNetworkSchedulerAction::PruneStreams { peer_id }); Ok(()) } (action, _) => Err(format!("incorrect state {state:?} for action {action:?}")), diff --git a/p2p/src/network/mod.rs b/p2p/src/network/mod.rs index ffa1f7e3b6..aba1301242 100644 --- a/p2p/src/network/mod.rs +++ b/p2p/src/network/mod.rs @@ -170,18 +170,6 @@ mod data { } } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, thiserror::Error)] -pub enum P2pNetworkError { - #[error("select error")] - SelectError, - #[error(transparent)] - IdentifyStreamError(#[from] P2pNetworkIdentifyStreamError), - #[error(transparent)] - KademliaIncomingStreamError(#[from] P2pNetworkKadIncomingStreamError), - #[error(transparent)] - KademliaOutgoingStreamError(#[from] P2pNetworkKadOutgoingStreamError), -} - /// Errors that might happen while handling protobuf messages received via a stream. #[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] pub enum P2pNetworkStreamProtobufError { diff --git a/p2p/src/network/scheduler/p2p_network_scheduler_effects.rs b/p2p/src/network/scheduler/p2p_network_scheduler_effects.rs index 5b3fe0b357..791bda755a 100644 --- a/p2p/src/network/scheduler/p2p_network_scheduler_effects.rs +++ b/p2p/src/network/scheduler/p2p_network_scheduler_effects.rs @@ -8,7 +8,7 @@ use crate::{ incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction, P2pConnectionState, }, - disconnection::{P2pDisconnectionAction, P2pDisconnectionReason}, + disconnection::P2pDisconnectionAction, identify::P2pIdentifyAction, network::identify::P2pNetworkIdentifyStreamAction, request::{P2pNetworkKadRequestState, P2pNetworkKadRequestStatus}, @@ -322,13 +322,7 @@ impl P2pNetworkSchedulerAction { store.dispatch(P2pNetworkSchedulerAction::Prune { addr }); - if !matches!( - reason, - P2pNetworkConnectionCloseReason::Disconnect( - P2pDisconnectionReason::NetworkError(P2pNetworkError::SelectError) - ) - ) && reason.is_disconnected() - { + if reason.is_disconnected() { // statemachine behaviour should continue with this, i.e. dispatch P2pDisconnectionAction::Finish return; } diff --git a/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs b/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs index d4987cead7..83e5d40e4a 100644 --- a/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs +++ b/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs @@ -139,17 +139,12 @@ impl P2pNetworkSchedulerState { connection.streams.remove(stream_id); } - connection.closed = Some(P2pNetworkConnectionCloseReason::Disconnect( - P2pNetworkError::SelectError.into(), - )); + connection.closed = Some(P2pNetworkConnectionError::SelectError.into()); + } + if let Some(connection) = self.connections.get_mut(addr) { + connection.closed = Some(P2pNetworkConnectionError::SelectError.into()); } else { - if let Some(connection) = self.connections.get_mut(addr) { - connection.closed = Some(P2pNetworkConnectionCloseReason::Disconnect( - P2pNetworkError::SelectError.into(), - )); - } else { - unreachable!() - } + unreachable!() } } P2pNetworkSchedulerAction::YamuxDidInit { addr, .. } => { diff --git a/p2p/src/network/scheduler/p2p_network_scheduler_state.rs b/p2p/src/network/scheduler/p2p_network_scheduler_state.rs index 0016405b2f..3bc6fd4408 100644 --- a/p2p/src/network/scheduler/p2p_network_scheduler_state.rs +++ b/p2p/src/network/scheduler/p2p_network_scheduler_state.rs @@ -94,7 +94,7 @@ impl P2pNetworkConnectionCloseReason { } /// P2p connection error. -#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)] pub enum P2pNetworkConnectionError { #[error("mio error: {0}")] MioError(String), @@ -102,6 +102,14 @@ pub enum P2pNetworkConnectionError { Noise(#[from] NoiseError), #[error("remote peer closed connection")] RemoteClosed, + #[error("select protocol error")] + SelectError, + #[error(transparent)] + IdentifyStreamError(#[from] P2pNetworkIdentifyStreamError), + #[error(transparent)] + KademliaIncomingStreamError(#[from] P2pNetworkKadIncomingStreamError), + #[error(transparent)] + KademliaOutgoingStreamError(#[from] P2pNetworkKadOutgoingStreamError), } #[derive(Serialize, Deserialize, Debug, Clone)]