Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 3 additions & 47 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions cli/src/commands/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use node::p2p::channels::ChannelId;
use node::p2p::connection::outgoing::P2pConnectionOutgoingInitOpts;
use node::p2p::identity::SecretKey;
use node::p2p::service_impl::webrtc_with_libp2p::P2pServiceWebrtcWithLibp2p;
use node::p2p::{P2pConfig, P2pTimeouts};
use node::p2p::{P2pConfig, P2pLimits, P2pTimeouts};
use node::service::{Recorder, Service};
use node::snark::{get_srs, get_verifier_index, VerifierKind};
use node::stats::Stats;
Expand Down Expand Up @@ -216,14 +216,14 @@ impl Node {
listen_port: self.port,
identity_pub_key: pub_key,
initial_peers: self.peers,
max_peers: 100,
ask_initial_peers_interval: Duration::from_secs(3600),
enabled_channels: ChannelId::for_libp2p().collect(),
timeouts: P2pTimeouts::default(),
peer_discovery: !self.no_peers_discovery,
initial_time: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("linear time"),
timeouts: P2pTimeouts::default(),
limits: P2pLimits::default().with_max_peers(Some(100)),
},
transition_frontier,
block_producer: block_producer.clone().map(|(config, _)| config),
Expand Down
6 changes: 3 additions & 3 deletions node/testing/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use node::core::warn;
use node::p2p::service_impl::{
webrtc::P2pServiceCtx, webrtc_with_libp2p::P2pServiceWebrtcWithLibp2p,
};
use node::p2p::{P2pConnectionEvent, P2pEvent, PeerId};
use node::p2p::{P2pConnectionEvent, P2pEvent, P2pLimits, PeerId};
use node::snark::{VerifierIndex, VerifierSRS};
use node::{
event_source::Event,
Expand Down Expand Up @@ -278,11 +278,11 @@ impl Cluster {
listen_port: http_port,
identity_pub_key: pub_key,
initial_peers,
max_peers: testing_config.max_peers,
ask_initial_peers_interval: testing_config.ask_initial_peers_interval,
enabled_channels: ChannelId::iter_all().collect(),
timeouts: testing_config.timeouts,
peer_discovery: true,
timeouts: testing_config.timeouts,
limits: P2pLimits::default().with_max_peers(Some(testing_config.max_peers)),
initial_time: testing_config
.initial_time
.checked_sub(redux::Timestamp::ZERO)
Expand Down
7 changes: 5 additions & 2 deletions node/testing/src/scenarios/p2p/basic_connection_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,10 @@ impl MaxNumberOfPeersIncoming {
// check that the number of ready peers does not exceed the maximal allowed number
let state = driver.inner().node(node_ut).unwrap().state();
let count = state.p2p.ready_peers_iter().count();
assert!(count <= MAX.into(), "max number of peers exceeded: {count}");
assert!(
count <= usize::from(MAX),
"max number of peers exceeded: {count}"
);

// check that the number of nodes with the node as their peer does not exceed the maximal allowed number
let peers_connected = || {
Expand All @@ -263,7 +266,7 @@ impl MaxNumberOfPeersIncoming {
})
};
assert!(
peers_connected().count() <= MAX.into(),
peers_connected().count() <= usize::from(MAX),
"peers connections to the node exceed the max number of connections: {}",
peers_connected().count()
);
Expand Down
5 changes: 1 addition & 4 deletions p2p/src/disconnection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};

use crate::{channels::ChannelId, connection::RejectionReason};

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, thiserror::Error)]
pub enum P2pDisconnectionReason {
#[error("message is unexpected for channel {0}")]
P2pChannelMsgUnexpected(ChannelId),
Expand All @@ -35,9 +35,6 @@ pub enum P2pDisconnectionReason {
#[error("duplicate connection")]
DuplicateConnection,

#[error("select error")]
SelectError,

#[error("timeout")]
Timeout,
}
2 changes: 1 addition & 1 deletion p2p/src/identity/peer_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl fmt::Debug for PeerId {
}
}

#[derive(Clone, Debug, thiserror::Error, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, thiserror::Error, Serialize, Deserialize)]
pub enum PeerIdFromLibp2pPeerId {
#[error("error decoding public key from protobuf: {0}")]
Protobuf(String),
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/network/identify/p2p_network_identify_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct P2pNetworkIdentify {
pub protocols: Vec<token::StreamKind>,
}

#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, thiserror::Error)]
pub enum P2pNetworkIdentifyFromMessageError {
#[error("cant parse protocol: {0}")]
UnsupportedProtocol(String),
Expand Down
7 changes: 5 additions & 2 deletions p2p/src/network/identify/p2p_network_identify_reducer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use redux::ActionWithMeta;

use crate::P2pLimits;

use super::stream::P2pNetworkIdentifyStreamAction;

impl super::P2pNetworkIdentifyState {
pub fn reducer(
&mut self,
action: ActionWithMeta<&super::P2pNetworkIdentifyAction>,
limits: &P2pLimits,
) -> Result<(), String> {
let (action, meta) = action.split();
match action {
Expand All @@ -16,7 +19,7 @@ impl super::P2pNetworkIdentifyState {
.map_err(|stream| {
format!("Identify stream already exists for action {action:?}: {stream:?}")
})
.and_then(|stream| stream.reducer(meta.with_action(action))),
.and_then(|stream| stream.reducer(meta.with_action(action), limits)),
super::P2pNetworkIdentifyAction::Stream(
action @ P2pNetworkIdentifyStreamAction::Prune { .. },
) => self
Expand All @@ -26,7 +29,7 @@ impl super::P2pNetworkIdentifyState {
super::P2pNetworkIdentifyAction::Stream(action) => self
.find_identify_stream_state_mut(action.peer_id(), action.stream_id())
.ok_or_else(|| format!("Identify stream not found for action {action:?}"))
.and_then(|stream| stream.reducer(meta.with_action(action))),
.and_then(|stream| stream.reducer(meta.with_action(action), limits)),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use super::{
super::{pb, P2pNetworkIdentify},
P2pNetworkIdentifyStreamAction,
};
use crate::{identify::P2pIdentifyAction, token, Data, P2pNetworkService, P2pNetworkYamuxAction};
use crate::{
identify::P2pIdentifyAction, network::identify::stream::P2pNetworkIdentifyStreamError, token,
Data, P2pNetworkSchedulerAction, P2pNetworkService, P2pNetworkYamuxAction,
};

fn get_addrs<I, S>(addr: &SocketAddr, net_svc: &mut S) -> I
where
Expand Down Expand Up @@ -171,7 +174,11 @@ impl P2pNetworkIdentifyStreamAction {
Ok(())
}
S::Error(err) => {
warn!(meta.time(); summary = "error handling Identify action", error = err, action = format!("{self:?}"));
warn!(meta.time(); summary = "error handling Identify action", error = display(err));
store.dispatch(P2pNetworkSchedulerAction::Error {
addr,
error: P2pNetworkIdentifyStreamError::from(err.clone()).into(),
});
Ok(())
}
_ => unimplemented!(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use super::{
P2pNetworkIdentifyStreamAction, P2pNetworkIdentifyStreamKind, P2pNetworkIdentifyStreamState,
};
use crate::network::identify::{pb::Identify, P2pNetworkIdentify};
use crate::{
network::identify::{pb::Identify, P2pNetworkIdentify},
P2pLimits, P2pNetworkStreamProtobufError,
};
use prost::Message;
use quick_protobuf::BytesReader;
use redux::ActionWithMeta;
Expand All @@ -10,6 +13,7 @@ impl P2pNetworkIdentifyStreamState {
pub fn reducer(
&mut self,
action: ActionWithMeta<&P2pNetworkIdentifyStreamAction>,
limits: &P2pLimits,
) -> Result<(), String> {
use super::P2pNetworkIdentifyStreamAction as A;
use super::P2pNetworkIdentifyStreamState as S;
Expand Down Expand Up @@ -38,13 +42,16 @@ impl P2pNetworkIdentifyStreamState {
let data = &data.0;
let mut reader = BytesReader::from_bytes(data);
let Ok(len) = reader.read_varint32(data).map(|v| v as usize) else {
*self = S::Error("error reading message length".to_owned());
*self = S::Error(P2pNetworkStreamProtobufError::MessageLength);
return Ok(());
};

// TODO: implement as configuration option
if len > 0x1000 {
*self = S::Error(format!("Identify message is too long ({})", len));
if len > limits.identify_message() {
*self = S::Error(P2pNetworkStreamProtobufError::Limit(
len,
limits.identify_message(),
));
return Ok(());
}

Expand Down Expand Up @@ -94,19 +101,17 @@ impl P2pNetworkIdentifyStreamState {
let message = match Identify::decode(&data[..len]) {
Ok(v) => v,
Err(e) => {
*self = P2pNetworkIdentifyStreamState::Error(format!(
"error reading protobuf message: {e}"
));
*self = P2pNetworkIdentifyStreamState::Error(
P2pNetworkStreamProtobufError::Message(e.to_string()),
);
return Ok(());
}
};

let data = match P2pNetworkIdentify::try_from(message.clone()) {
Ok(v) => v,
Err(e) => {
*self = P2pNetworkIdentifyStreamState::Error(format!(
"error converting protobuf message: {e}"
));
*self = P2pNetworkIdentifyStreamState::Error(e.into());
return Ok(());
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::network::identify::P2pNetworkIdentify;
use crate::{
network::identify::{P2pNetworkIdentify, P2pNetworkIdentifyFromMessageError},
P2pNetworkStreamProtobufError,
};
use serde::{Deserialize, Serialize};

#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -36,7 +39,7 @@ pub enum P2pNetworkIdentifyStreamState {
data: P2pNetworkIdentify,
},
/// Error handling the stream.
Error(String),
Error(P2pNetworkStreamProtobufError<P2pNetworkIdentifyFromMessageError>),
}

impl P2pNetworkIdentifyStreamState {
Expand All @@ -50,3 +53,9 @@ impl From<P2pNetworkIdentify> for P2pNetworkIdentifyStreamState {
P2pNetworkIdentifyStreamState::IdentifyReceived { data }
}
}

#[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)]
#[error("identify stream: {0}")]
pub struct P2pNetworkIdentifyStreamError(
#[from] P2pNetworkStreamProtobufError<P2pNetworkIdentifyFromMessageError>,
);
2 changes: 1 addition & 1 deletion p2p/src/network/kad/p2p_network_kad_internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ impl P2pNetworkKadEntry {
}
}

#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, thiserror::Error)]
pub enum P2pNetworkKadEntryTryFromError {
#[error(transparent)]
PeerId(#[from] P2pNetworkKademliaPeerIdError),
Expand Down
6 changes: 3 additions & 3 deletions p2p/src/network/kad/p2p_network_kad_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl P2pNetworkKademliaRpcRequest {
}
}

#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
#[derive(Clone, Debug, Serialize, PartialEq, Deserialize, thiserror::Error)]
pub enum P2pNetworkKademliaPeerIdError {
#[error("error decoding PeerId from bytes: lenght {0} while expected 32")]
Parse(String),
Expand Down Expand Up @@ -99,7 +99,7 @@ pub enum P2pNetworkKademliaRpcPeerTryFromError {
Multiaddr(#[from] P2pNetworkKademliaMultiaddrError),
}

#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, thiserror::Error)]
#[error("error decoding Multiaddr from bytes: {0}")]
pub struct P2pNetworkKademliaMultiaddrError(String);

Expand All @@ -115,7 +115,7 @@ impl From<multiaddr::Error> for P2pNetworkKademliaMultiaddrError {
}
}

#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, thiserror::Error)]
pub enum P2pNetworkKademliaRpcFromMessageError {
#[error(transparent)]
PeerId(#[from] P2pNetworkKademliaPeerIdError),
Expand Down
Loading