Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions node/src/event_source/event_source_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ pub fn event_source_effects<S: Service>(store: &mut Store<S>, action: EventSourc
});
}
}
MioEvent::ConnectionDidCloseOnDemand(addr) => {
store.dispatch(P2pNetworkSchedulerAction::Prune { addr });
}
},
P2pEvent::Connection(e) => match e {
P2pConnectionEvent::OfferSdpReady(peer_id, res) => match res {
Expand Down
3 changes: 3 additions & 0 deletions node/src/rpc/rpc_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,9 @@ fn collect_rpc_peers_info(state: &crate::State) -> Vec<RpcPeerInfo> {
(PeerConnectionStatus::Connecting, i.time().into())
}
},
p2p::P2pPeerStatus::Disconnecting { time } => {
(PeerConnectionStatus::Disconnected, (*time).into())
}
p2p::P2pPeerStatus::Disconnected { time } => {
(PeerConnectionStatus::Disconnected, (*time).into())
}
Expand Down
1 change: 1 addition & 0 deletions node/testing/src/scenarios/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,7 @@ impl ConnectionPredicate for (ClusterNodeId, ConnectionPredicates) {
peer_id == pid
&& match peer_status {
P2pPeerStatus::Connecting(c) => c.is_error(),
P2pPeerStatus::Disconnecting { .. } => true,
P2pPeerStatus::Disconnected { .. } => true,
P2pPeerStatus::Ready(_) => true,
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ redux = { workspace = true, features=["serializable_callbacks"] }
tokio = { version = "1.26", features = ["rt"] }
webrtc = { git = "https://github.com/openmina/webrtc.git", branch = "openmina-v0.11.0", optional = true }
hyper = { version = "0.14.25", features = ["client", "http1", "tcp"] }
mio = { version = "0.8.11", features = ["os-poll"] }
mio = { version = "0.8.11", features = ["os-poll", "net"] }
libc = { version = "0.2.151" }
local-ip-address = "0.6.1"

Expand Down
15 changes: 9 additions & 6 deletions p2p/src/connection/outgoing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,15 @@ pub(crate) mod libp2p_opts {
}

pub fn matches_socket_addr(&self, addr: SocketAddr) -> bool {
self.port == addr.port()
&& match (&self.host, addr) {
(Host::Ipv4(ip), SocketAddr::V4(addr)) => ip == addr.ip(),
(Host::Ipv6(ip), SocketAddr::V6(addr)) => ip == addr.ip(),
_ => false,
}
self.port == addr.port() && self.matches_socket_ip(addr)
}

pub fn matches_socket_ip(&self, addr: SocketAddr) -> bool {
match (&self.host, addr) {
(Host::Ipv4(ip), SocketAddr::V4(addr)) => ip == addr.ip(),
(Host::Ipv6(ip), SocketAddr::V6(addr)) => ip == addr.ip(),
_ => false,
}
}
}

Expand Down
19 changes: 13 additions & 6 deletions p2p/src/disconnection/p2p_disconnection_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,32 @@ impl P2pDisconnectedState {

match action {
P2pDisconnectionAction::Init { peer_id, reason } => {
let (dispatcher, state) = state_context.into_dispatcher_and_state();
let p2p_state: &P2pState = state.substate()?;

#[cfg(feature = "p2p-libp2p")]
if p2p_state.is_libp2p_peer(peer_id) {
if let Some((addr, _)) = p2p_state
if let Some((&addr, _)) = p2p_state
.network
.scheduler
.connections
.iter()
.find(|(_, conn_state)| conn_state.peer_id() == Some(peer_id))
{
let Some(peer) = p2p_state.peers.get_mut(peer_id) else {
bug_condition!("Invalid state for: `P2pDisconnectionAction::Finish`");
return Ok(());
};
peer.status = P2pPeerStatus::Disconnecting { time: meta.time() };

let dispatcher = state_context.into_dispatcher();
dispatcher.push(P2pNetworkSchedulerAction::Disconnect {
addr: *addr,
addr,
reason: reason.clone(),
});
dispatcher.push(P2pDisconnectionAction::Finish { peer_id: *peer_id });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it gets insta finished, without any event from mio, then disconnecting state seems useless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it for further improvement. I will make Finish an effect of Prune, but not now.

}
return Ok(());
}

let dispatcher = state_context.into_dispatcher();
dispatcher.push(P2pDisconnectionEffectfulAction::Init { peer_id: *peer_id });
Ok(())
}
Expand All @@ -65,7 +70,9 @@ impl P2pDisconnectedState {
.scheduler
.connections
.iter()
.any(|(_addr, conn_state)| conn_state.peer_id() == Some(peer_id))
.any(|(_addr, conn_state)| {
conn_state.peer_id() == Some(peer_id) && conn_state.closed.is_none()
})
{
return Ok(());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,13 @@ impl P2pNetworkKadRequestState {
Some(P2pPeerState { status, .. }) if status.as_ready().is_none() => {
on_connection_in_progress(dispatcher)
}
_ => on_connection_established(dispatcher),
Some(P2pPeerState { status, .. }) if status.as_ready().is_some() => {
on_connection_established(dispatcher)
}
_ => {
bug_condition!("state must be either ready or not ready, peer {peer_id}");
Ok(())
}
}
}
P2pNetworkKadRequestAction::PeerIsConnecting { .. } => {
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/network/noise/p2p_network_noise_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl P2pNetworkNoiseState {
.get_substate_mut()?
.connection_state_mut(action.addr())
.and_then(|c| c.noise_state_mut())
.ok_or_else(|| "Invalid noise state".to_owned())?;
.ok_or_else(|| format!("Invalid noise state {}", action.addr()))?;

match action {
P2pNetworkNoiseAction::Init {
Expand Down
3 changes: 2 additions & 1 deletion p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,8 @@ impl P2pNetworkPubsubState {

fn reduce_incoming_data(&mut self, peer_id: &PeerId, data: &Data) -> Result<(), String> {
let Some(state) = self.clients.get_mut(peer_id) else {
bug_condition!("State not found for action: P2pNetworkPubsubAction::IncomingData");
// TODO: investigate, cannot reproduce this
// bug_condition!("State not found for action: P2pNetworkPubsubAction::IncomingData");
return Ok(());
};
let slice = if state.buffer.is_empty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ impl P2pNetworkPubsubEffectfulAction {
seen_limit,
} => {
let Some(state) = state.clients.get(&peer_id) else {
bug_condition!("{:?} not found in state.clients", peer_id);
// TODO: investigate, cannot reproduce this
// bug_condition!("{:?} not found in state.clients", peer_id);
return;
};
let messages = state.incoming_messages.clone();
Expand Down
7 changes: 4 additions & 3 deletions p2p/src/network/scheduler/p2p_network_scheduler_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,15 @@ impl redux::EnablingCondition<P2pState> for P2pNetworkSchedulerAction {
!state.network.scheduler.connections.contains_key(addr)
})
}
P2pNetworkSchedulerAction::OutgoingConnect { addr } => !state
P2pNetworkSchedulerAction::OutgoingConnect { addr } => state
.network
.scheduler
.connections
.contains_key(&ConnectionAddr {
.get(&ConnectionAddr {
sock_addr: *addr,
incoming: false,
}),
})
.map_or(true, |v| v.closed.is_some()),
P2pNetworkSchedulerAction::OutgoingDidConnect { addr, .. } => state
.network
.scheduler
Expand Down
15 changes: 9 additions & 6 deletions p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ impl P2pNetworkSchedulerState {
addr: *addr,
reason: reason.clone(),
});

Ok(())
}
P2pNetworkSchedulerAction::Error { addr, error } => {
Expand Down Expand Up @@ -352,7 +353,6 @@ impl P2pNetworkSchedulerState {
let state: &P2pState = state.substate()?;

let peer_with_state = state.peer_with_connection(*addr);
dispatcher.push(P2pNetworkSchedulerAction::Prune { addr: *addr });

if reason.is_disconnected() {
// statemachine behaviour should continue with this, i.e. dispatch P2pDisconnectionAction::Finish
Expand All @@ -379,6 +379,7 @@ impl P2pNetworkSchedulerState {
error: reason.to_string(),
});
}
P2pPeerStatus::Disconnecting { .. } => {}
P2pPeerStatus::Disconnected { .. } => {
// sanity check, should be incoming connection
if !incoming {
Expand All @@ -405,13 +406,15 @@ impl P2pNetworkSchedulerState {
Ok(())
}
P2pNetworkSchedulerAction::Prune { addr } => {
let _ = scheduler_state.connections.remove(addr);
Ok(())
}
P2pNetworkSchedulerAction::PruneStreams { peer_id } => {
scheduler_state.prune_peer_state(peer_id);
if let Some(old) = scheduler_state.connections.remove(addr) {
if let Some(peer_id) = old.peer_id() {
scheduler_state.prune_peer_state(peer_id);
}
}
Ok(())
}
// TODO: remove the action
P2pNetworkSchedulerAction::PruneStreams { .. } => Ok(()),
P2pNetworkSchedulerAction::PruneStream { peer_id, stream_id } => {
let Some((_, conn_state)) = scheduler_state
.connections
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/network/yamux/p2p_network_yamux_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl P2pNetworkYamuxState {
let connection_state =
<State as SubstateAccess<P2pNetworkSchedulerState>>::substate(state)?
.connection_state(&addr)
.ok_or_else(|| "Connection not found".to_owned())?;
.ok_or_else(|| format!("Connection not found {}", action.addr()))?;

let stream = connection_state
.yamux_state()
Expand Down
6 changes: 3 additions & 3 deletions p2p/src/p2p_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use crate::{
};

pub const DEVNET_SEEDS: &[&str] = &[
"/ip4/34.48.73.58/tcp/10003/p2p/12D3KooWAdgYL6hv18M3iDBdaK1dRygPivSfAfBNDzie6YqydVbs",
"/ip4/35.245.82.250/tcp/10003/p2p/12D3KooWLjs54xHzVmMmGYb7W5RVibqbwD1co7M2ZMfPgPm7iAag",
"/ip4/34.118.163.79/tcp/10003/p2p/12D3KooWEiGVAFC7curXWXiGZyMWnZK9h8BKr88U8D5PKV3dXciv",
"/ip4/34.45.167.81/tcp/10003/p2p/12D3KooWAdgYL6hv18M3iDBdaK1dRygPivSfAfBNDzie6YqydVbs",
"/ip4/34.28.194.121/tcp/10003/p2p/12D3KooWLjs54xHzVmMmGYb7W5RVibqbwD1co7M2ZMfPgPm7iAag",
"/ip4/34.44.189.148/tcp/10003/p2p/12D3KooWEiGVAFC7curXWXiGZyMWnZK9h8BKr88U8D5PKV3dXciv",
];

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down
6 changes: 6 additions & 0 deletions p2p/src/p2p_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub enum MioEvent {

/// The remote peer is disconnected gracefully or with an error.
ConnectionDidClose(ConnectionAddr, Result<(), String>),

/// The remote peer is disconnected by our node.
ConnectionDidCloseOnDemand(ConnectionAddr),
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -266,6 +269,9 @@ impl fmt::Display for MioEvent {
Self::ConnectionDidClose(addr, res) => {
write!(f, "ConnectionDidClose, {addr}, {}", res_kind(res))
}
Self::ConnectionDidCloseOnDemand(addr) => {
write!(f, "ConnectionDidCloseOnDemand, {addr}")
}
}
}
}
8 changes: 7 additions & 1 deletion p2p/src/p2p_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ impl P2pPeerState {
P2pPeerStatus::Connecting(P2pConnectionState::Outgoing(
P2pConnectionOutgoingState::Error { time, .. },
)) => is_time_passed(now, *time, timeouts.outgoing_error_reconnect_timeout),
P2pPeerStatus::Disconnected { time } => {
P2pPeerStatus::Disconnected { time } | P2pPeerStatus::Disconnecting { time } => {
*time == Timestamp::ZERO
|| is_time_passed(now, *time, timeouts.reconnect_timeout)
}
Expand All @@ -383,6 +383,7 @@ impl P2pPeerState {
#[serde(tag = "state")]
pub enum P2pPeerStatus {
Connecting(P2pConnectionState),
Disconnecting { time: redux::Timestamp },
Disconnected { time: redux::Timestamp },

Ready(P2pPeerStatusReady),
Expand All @@ -402,10 +403,15 @@ impl P2pPeerStatus {
match self {
Self::Connecting(s) => !s.is_error(),
Self::Ready(_) => true,
Self::Disconnecting { .. } => false,
Self::Disconnected { .. } => false,
}
}

pub fn is_disconnected_or_disconnecting(&self) -> bool {
matches!(self, Self::Disconnecting { .. } | Self::Disconnected { .. })
}

pub fn as_connecting(&self) -> Option<&P2pConnectionState> {
match self {
Self::Connecting(v) => Some(v),
Expand Down
18 changes: 7 additions & 11 deletions p2p/src/service_impl/mio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,11 +523,6 @@ where
}
self.connections.insert(addr, connection);
}
} else {
self.send(MioEvent::IncomingDataDidReceive(
addr,
Err("not connected".to_string()),
));
}
}
Send(addr, buf) => {
Expand Down Expand Up @@ -563,16 +558,17 @@ where
self.send(MioEvent::ConnectionDidClose(addr, Err(err.to_string())));
}
}
} else {
self.send(MioEvent::OutgoingDataDidSend(
addr,
Err("not connected".to_string()),
));
}
}
Disconnect(addr) => {
// drop the connection and destructor will close it
self.connections.remove(&addr);
if let Some(mut cn) = self.connections.remove(&addr) {
self.poll
.registry()
.deregister(&mut cn.stream)
.unwrap_or_default();
}
self.send(MioEvent::ConnectionDidCloseOnDemand(addr));
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions p2p/testing/src/redux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ pub(super) fn event_effect(store: &mut crate::redux::Store, event: P2pEvent) ->
)
}
}
MioEvent::ConnectionDidCloseOnDemand(addr) => {
SubStore::dispatch(store, P2pNetworkSchedulerAction::Prune { addr })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe u should dispatch P2pDisconnectionAction::Finish as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact I'd put SubStore::dispatch(store, P2pNetworkSchedulerAction::Prune { addr }) action in effects of the P2pDisconnectionAction::Finish

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is too big change and require more debugging to make it work properly. Let's allow connection where conn_state.closed is Some(..) to exist after Finish done.

}
},
_ => false,
}
Expand Down
Loading