diff --git a/node/src/event_source/event_source_effects.rs b/node/src/event_source/event_source_effects.rs index 8fd60ee704..570a924c8b 100644 --- a/node/src/event_source/event_source_effects.rs +++ b/node/src/event_source/event_source_effects.rs @@ -115,6 +115,9 @@ pub fn event_source_effects(store: &mut Store, action: EventSourc }); } } + MioEvent::ConnectionDidCloseOnDemand(addr) => { + store.dispatch(P2pNetworkSchedulerAction::Prune { addr }); + } }, P2pEvent::Connection(e) => match e { P2pConnectionEvent::OfferSdpReady(peer_id, res) => match res { diff --git a/node/src/rpc/rpc_effects.rs b/node/src/rpc/rpc_effects.rs index d5783b77aa..48f16c85b1 100644 --- a/node/src/rpc/rpc_effects.rs +++ b/node/src/rpc/rpc_effects.rs @@ -811,6 +811,9 @@ fn collect_rpc_peers_info(state: &crate::State) -> Vec { (PeerConnectionStatus::Connecting, i.time().into()) } }, + p2p::P2pPeerStatus::Disconnecting { time } => { + (PeerConnectionStatus::Disconnected, (*time).into()) + } p2p::P2pPeerStatus::Disconnected { time } => { (PeerConnectionStatus::Disconnected, (*time).into()) } diff --git a/node/testing/src/scenarios/driver.rs b/node/testing/src/scenarios/driver.rs index 0ae57e8ead..2a33c94b12 100644 --- a/node/testing/src/scenarios/driver.rs +++ b/node/testing/src/scenarios/driver.rs @@ -609,6 +609,7 @@ impl ConnectionPredicate for (ClusterNodeId, ConnectionPredicates) { peer_id == pid && match peer_status { P2pPeerStatus::Connecting(c) => c.is_error(), + P2pPeerStatus::Disconnecting { .. } => true, P2pPeerStatus::Disconnected { .. } => true, P2pPeerStatus::Ready(_) => true, } diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 38670a857b..10521d3ebf 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -68,7 +68,7 @@ redux = { workspace = true, features=["serializable_callbacks"] } tokio = { version = "1.26", features = ["rt"] } webrtc = { git = "https://github.com/openmina/webrtc.git", branch = "openmina-v0.11.0", optional = true } hyper = { version = "0.14.25", features = ["client", "http1", "tcp"] } -mio = { version = "0.8.11", features = ["os-poll"] } +mio = { version = "0.8.11", features = ["os-poll", "net"] } libc = { version = "0.2.151" } local-ip-address = "0.6.1" diff --git a/p2p/src/connection/outgoing/mod.rs b/p2p/src/connection/outgoing/mod.rs index 1048edf9de..aa13628705 100644 --- a/p2p/src/connection/outgoing/mod.rs +++ b/p2p/src/connection/outgoing/mod.rs @@ -66,12 +66,15 @@ pub(crate) mod libp2p_opts { } pub fn matches_socket_addr(&self, addr: SocketAddr) -> bool { - self.port == addr.port() - && match (&self.host, addr) { - (Host::Ipv4(ip), SocketAddr::V4(addr)) => ip == addr.ip(), - (Host::Ipv6(ip), SocketAddr::V6(addr)) => ip == addr.ip(), - _ => false, - } + self.port == addr.port() && self.matches_socket_ip(addr) + } + + pub fn matches_socket_ip(&self, addr: SocketAddr) -> bool { + match (&self.host, addr) { + (Host::Ipv4(ip), SocketAddr::V4(addr)) => ip == addr.ip(), + (Host::Ipv6(ip), SocketAddr::V6(addr)) => ip == addr.ip(), + _ => false, + } } } diff --git a/p2p/src/disconnection/p2p_disconnection_reducer.rs b/p2p/src/disconnection/p2p_disconnection_reducer.rs index 0d8fc05d09..630594a978 100644 --- a/p2p/src/disconnection/p2p_disconnection_reducer.rs +++ b/p2p/src/disconnection/p2p_disconnection_reducer.rs @@ -22,20 +22,24 @@ impl P2pDisconnectedState { match action { P2pDisconnectionAction::Init { peer_id, reason } => { - let (dispatcher, state) = state_context.into_dispatcher_and_state(); - let p2p_state: &P2pState = state.substate()?; - #[cfg(feature = "p2p-libp2p")] if p2p_state.is_libp2p_peer(peer_id) { - if let Some((addr, _)) = p2p_state + if let Some((&addr, _)) = p2p_state .network .scheduler .connections .iter() .find(|(_, conn_state)| conn_state.peer_id() == Some(peer_id)) { + let Some(peer) = p2p_state.peers.get_mut(peer_id) else { + bug_condition!("Invalid state for: `P2pDisconnectionAction::Finish`"); + return Ok(()); + }; + peer.status = P2pPeerStatus::Disconnecting { time: meta.time() }; + + let dispatcher = state_context.into_dispatcher(); dispatcher.push(P2pNetworkSchedulerAction::Disconnect { - addr: *addr, + addr, reason: reason.clone(), }); dispatcher.push(P2pDisconnectionAction::Finish { peer_id: *peer_id }); @@ -43,6 +47,7 @@ impl P2pDisconnectedState { return Ok(()); } + let dispatcher = state_context.into_dispatcher(); dispatcher.push(P2pDisconnectionEffectfulAction::Init { peer_id: *peer_id }); Ok(()) } @@ -65,7 +70,9 @@ impl P2pDisconnectedState { .scheduler .connections .iter() - .any(|(_addr, conn_state)| conn_state.peer_id() == Some(peer_id)) + .any(|(_addr, conn_state)| { + conn_state.peer_id() == Some(peer_id) && conn_state.closed.is_none() + }) { return Ok(()); } diff --git a/p2p/src/network/kad/request/p2p_network_kad_request_reducer.rs b/p2p/src/network/kad/request/p2p_network_kad_request_reducer.rs index 72a1701f64..5480459196 100644 --- a/p2p/src/network/kad/request/p2p_network_kad_request_reducer.rs +++ b/p2p/src/network/kad/request/p2p_network_kad_request_reducer.rs @@ -121,7 +121,13 @@ impl P2pNetworkKadRequestState { Some(P2pPeerState { status, .. }) if status.as_ready().is_none() => { on_connection_in_progress(dispatcher) } - _ => on_connection_established(dispatcher), + Some(P2pPeerState { status, .. }) if status.as_ready().is_some() => { + on_connection_established(dispatcher) + } + _ => { + bug_condition!("state must be either ready or not ready, peer {peer_id}"); + Ok(()) + } } } P2pNetworkKadRequestAction::PeerIsConnecting { .. } => { diff --git a/p2p/src/network/noise/p2p_network_noise_reducer.rs b/p2p/src/network/noise/p2p_network_noise_reducer.rs index a218a282e4..0b199b0f81 100644 --- a/p2p/src/network/noise/p2p_network_noise_reducer.rs +++ b/p2p/src/network/noise/p2p_network_noise_reducer.rs @@ -33,7 +33,7 @@ impl P2pNetworkNoiseState { .get_substate_mut()? .connection_state_mut(action.addr()) .and_then(|c| c.noise_state_mut()) - .ok_or_else(|| "Invalid noise state".to_owned())?; + .ok_or_else(|| format!("Invalid noise state {}", action.addr()))?; match action { P2pNetworkNoiseAction::Init { diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs index cd11a60169..b09e8793de 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs @@ -458,7 +458,8 @@ impl P2pNetworkPubsubState { fn reduce_incoming_data(&mut self, peer_id: &PeerId, data: &Data) -> Result<(), String> { let Some(state) = self.clients.get_mut(peer_id) else { - bug_condition!("State not found for action: P2pNetworkPubsubAction::IncomingData"); + // TODO: investigate, cannot reproduce this + // bug_condition!("State not found for action: P2pNetworkPubsubAction::IncomingData"); return Ok(()); }; let slice = if state.buffer.is_empty() { diff --git a/p2p/src/network/pubsub/pubsub_effectful/p2p_network_pubsub_effectful_effects.rs b/p2p/src/network/pubsub/pubsub_effectful/p2p_network_pubsub_effectful_effects.rs index 1fd5392948..8bb010578e 100644 --- a/p2p/src/network/pubsub/pubsub_effectful/p2p_network_pubsub_effectful_effects.rs +++ b/p2p/src/network/pubsub/pubsub_effectful/p2p_network_pubsub_effectful_effects.rs @@ -33,7 +33,8 @@ impl P2pNetworkPubsubEffectfulAction { seen_limit, } => { let Some(state) = state.clients.get(&peer_id) else { - bug_condition!("{:?} not found in state.clients", peer_id); + // TODO: investigate, cannot reproduce this + // bug_condition!("{:?} not found in state.clients", peer_id); return; }; let messages = state.incoming_messages.clone(); diff --git a/p2p/src/network/scheduler/p2p_network_scheduler_actions.rs b/p2p/src/network/scheduler/p2p_network_scheduler_actions.rs index 80bfc19807..45681cf7ff 100644 --- a/p2p/src/network/scheduler/p2p_network_scheduler_actions.rs +++ b/p2p/src/network/scheduler/p2p_network_scheduler_actions.rs @@ -133,14 +133,15 @@ impl redux::EnablingCondition for P2pNetworkSchedulerAction { !state.network.scheduler.connections.contains_key(addr) }) } - P2pNetworkSchedulerAction::OutgoingConnect { addr } => !state + P2pNetworkSchedulerAction::OutgoingConnect { addr } => state .network .scheduler .connections - .contains_key(&ConnectionAddr { + .get(&ConnectionAddr { sock_addr: *addr, incoming: false, - }), + }) + .map_or(true, |v| v.closed.is_some()), P2pNetworkSchedulerAction::OutgoingDidConnect { addr, .. } => state .network .scheduler diff --git a/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs b/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs index 0fe5313f31..d5821f2444 100644 --- a/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs +++ b/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs @@ -310,6 +310,7 @@ impl P2pNetworkSchedulerState { addr: *addr, reason: reason.clone(), }); + Ok(()) } P2pNetworkSchedulerAction::Error { addr, error } => { @@ -352,7 +353,6 @@ impl P2pNetworkSchedulerState { let state: &P2pState = state.substate()?; let peer_with_state = state.peer_with_connection(*addr); - dispatcher.push(P2pNetworkSchedulerAction::Prune { addr: *addr }); if reason.is_disconnected() { // statemachine behaviour should continue with this, i.e. dispatch P2pDisconnectionAction::Finish @@ -379,6 +379,7 @@ impl P2pNetworkSchedulerState { error: reason.to_string(), }); } + P2pPeerStatus::Disconnecting { .. } => {} P2pPeerStatus::Disconnected { .. } => { // sanity check, should be incoming connection if !incoming { @@ -405,13 +406,15 @@ impl P2pNetworkSchedulerState { Ok(()) } P2pNetworkSchedulerAction::Prune { addr } => { - let _ = scheduler_state.connections.remove(addr); - Ok(()) - } - P2pNetworkSchedulerAction::PruneStreams { peer_id } => { - scheduler_state.prune_peer_state(peer_id); + if let Some(old) = scheduler_state.connections.remove(addr) { + if let Some(peer_id) = old.peer_id() { + scheduler_state.prune_peer_state(peer_id); + } + } Ok(()) } + // TODO: remove the action + P2pNetworkSchedulerAction::PruneStreams { .. } => Ok(()), P2pNetworkSchedulerAction::PruneStream { peer_id, stream_id } => { let Some((_, conn_state)) = scheduler_state .connections diff --git a/p2p/src/network/yamux/p2p_network_yamux_reducer.rs b/p2p/src/network/yamux/p2p_network_yamux_reducer.rs index 90144a3cc6..5899a9f102 100644 --- a/p2p/src/network/yamux/p2p_network_yamux_reducer.rs +++ b/p2p/src/network/yamux/p2p_network_yamux_reducer.rs @@ -230,7 +230,7 @@ impl P2pNetworkYamuxState { let connection_state = >::substate(state)? .connection_state(&addr) - .ok_or_else(|| "Connection not found".to_owned())?; + .ok_or_else(|| format!("Connection not found {}", action.addr()))?; let stream = connection_state .yamux_state() diff --git a/p2p/src/p2p_config.rs b/p2p/src/p2p_config.rs index 8a7aaca3d5..2c82378ab7 100644 --- a/p2p/src/p2p_config.rs +++ b/p2p/src/p2p_config.rs @@ -7,9 +7,9 @@ use crate::{ }; pub const DEVNET_SEEDS: &[&str] = &[ - "/ip4/34.48.73.58/tcp/10003/p2p/12D3KooWAdgYL6hv18M3iDBdaK1dRygPivSfAfBNDzie6YqydVbs", - "/ip4/35.245.82.250/tcp/10003/p2p/12D3KooWLjs54xHzVmMmGYb7W5RVibqbwD1co7M2ZMfPgPm7iAag", - "/ip4/34.118.163.79/tcp/10003/p2p/12D3KooWEiGVAFC7curXWXiGZyMWnZK9h8BKr88U8D5PKV3dXciv", + "/ip4/34.45.167.81/tcp/10003/p2p/12D3KooWAdgYL6hv18M3iDBdaK1dRygPivSfAfBNDzie6YqydVbs", + "/ip4/34.28.194.121/tcp/10003/p2p/12D3KooWLjs54xHzVmMmGYb7W5RVibqbwD1co7M2ZMfPgPm7iAag", + "/ip4/34.44.189.148/tcp/10003/p2p/12D3KooWEiGVAFC7curXWXiGZyMWnZK9h8BKr88U8D5PKV3dXciv", ]; #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/p2p/src/p2p_event.rs b/p2p/src/p2p_event.rs index dd31f7f94f..f4d84a6249 100644 --- a/p2p/src/p2p_event.rs +++ b/p2p/src/p2p_event.rs @@ -48,6 +48,9 @@ pub enum MioEvent { /// The remote peer is disconnected gracefully or with an error. ConnectionDidClose(ConnectionAddr, Result<(), String>), + + /// The remote peer is disconnected by our node. + ConnectionDidCloseOnDemand(ConnectionAddr), } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -266,6 +269,9 @@ impl fmt::Display for MioEvent { Self::ConnectionDidClose(addr, res) => { write!(f, "ConnectionDidClose, {addr}, {}", res_kind(res)) } + Self::ConnectionDidCloseOnDemand(addr) => { + write!(f, "ConnectionDidCloseOnDemand, {addr}") + } } } } diff --git a/p2p/src/p2p_state.rs b/p2p/src/p2p_state.rs index 06bd2a2e49..6453dce40a 100644 --- a/p2p/src/p2p_state.rs +++ b/p2p/src/p2p_state.rs @@ -370,7 +370,7 @@ impl P2pPeerState { P2pPeerStatus::Connecting(P2pConnectionState::Outgoing( P2pConnectionOutgoingState::Error { time, .. }, )) => is_time_passed(now, *time, timeouts.outgoing_error_reconnect_timeout), - P2pPeerStatus::Disconnected { time } => { + P2pPeerStatus::Disconnected { time } | P2pPeerStatus::Disconnecting { time } => { *time == Timestamp::ZERO || is_time_passed(now, *time, timeouts.reconnect_timeout) } @@ -383,6 +383,7 @@ impl P2pPeerState { #[serde(tag = "state")] pub enum P2pPeerStatus { Connecting(P2pConnectionState), + Disconnecting { time: redux::Timestamp }, Disconnected { time: redux::Timestamp }, Ready(P2pPeerStatusReady), @@ -402,10 +403,15 @@ impl P2pPeerStatus { match self { Self::Connecting(s) => !s.is_error(), Self::Ready(_) => true, + Self::Disconnecting { .. } => false, Self::Disconnected { .. } => false, } } + pub fn is_disconnected_or_disconnecting(&self) -> bool { + matches!(self, Self::Disconnecting { .. } | Self::Disconnected { .. }) + } + pub fn as_connecting(&self) -> Option<&P2pConnectionState> { match self { Self::Connecting(v) => Some(v), diff --git a/p2p/src/service_impl/mio/mod.rs b/p2p/src/service_impl/mio/mod.rs index 6b2806b286..84d034aa26 100644 --- a/p2p/src/service_impl/mio/mod.rs +++ b/p2p/src/service_impl/mio/mod.rs @@ -523,11 +523,6 @@ where } self.connections.insert(addr, connection); } - } else { - self.send(MioEvent::IncomingDataDidReceive( - addr, - Err("not connected".to_string()), - )); } } Send(addr, buf) => { @@ -563,16 +558,17 @@ where self.send(MioEvent::ConnectionDidClose(addr, Err(err.to_string()))); } } - } else { - self.send(MioEvent::OutgoingDataDidSend( - addr, - Err("not connected".to_string()), - )); } } Disconnect(addr) => { // drop the connection and destructor will close it - self.connections.remove(&addr); + if let Some(mut cn) = self.connections.remove(&addr) { + self.poll + .registry() + .deregister(&mut cn.stream) + .unwrap_or_default(); + } + self.send(MioEvent::ConnectionDidCloseOnDemand(addr)); } } } diff --git a/p2p/testing/src/redux.rs b/p2p/testing/src/redux.rs index 5be720bb4f..e3816725bb 100644 --- a/p2p/testing/src/redux.rs +++ b/p2p/testing/src/redux.rs @@ -233,6 +233,9 @@ pub(super) fn event_effect(store: &mut crate::redux::Store, event: P2pEvent) -> ) } } + MioEvent::ConnectionDidCloseOnDemand(addr) => { + SubStore::dispatch(store, P2pNetworkSchedulerAction::Prune { addr }) + } }, _ => false, }