Skip to content

Commit

Permalink
fix: decrease connection info based on current state
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse committed Mar 15, 2024
1 parent 4e1c56f commit 71f8581
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 11 deletions.
4 changes: 2 additions & 2 deletions crates/net/network/src/manager.rs
Expand Up @@ -796,7 +796,7 @@ where
);

if let Some(ref err) = error {
self.swarm.state_mut().peers_mut().on_pending_session_dropped(
self.swarm.state_mut().peers_mut().on_outgoing_pending_session_dropped(
&remote_addr,
&peer_id,
err,
Expand All @@ -809,7 +809,7 @@ where
self.swarm
.state_mut()
.peers_mut()
.on_pending_session_gracefully_closed(&peer_id);
.on_outgoing_pending_session_gracefully_closed(&peer_id);
}
self.metrics.closed_sessions.increment(1);
self.metrics
Expand Down
47 changes: 38 additions & 9 deletions crates/net/network/src/peers/manager.rs
Expand Up @@ -427,19 +427,16 @@ impl PeersManager {
}

/// Gracefully disconnected a pending _outgoing_ session
pub(crate) fn on_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
if let Some(peer) = self.peers.get_mut(peer_id) {
self.connection_info.decr_state(peer.state);
peer.state = PeerConnectionState::Idle;
} else {
return
}

self.connection_info.decr_out();
}

/// Invoked when an _outgoing_ pending session was closed during authentication or the
/// handshake.
pub(crate) fn on_pending_session_dropped(
pub(crate) fn on_outgoing_pending_session_dropped(
&mut self,
remote_addr: &SocketAddr,
peer_id: &PeerId,
Expand Down Expand Up @@ -1675,7 +1672,7 @@ mod tests {
})
.await;

peers.on_pending_session_dropped(
peers.on_outgoing_pending_session_dropped(
&socket_addr,
&peer,
&PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
Expand Down Expand Up @@ -1838,7 +1835,7 @@ mod tests {
})
.await;

peers.on_pending_session_dropped(
peers.on_outgoing_pending_session_dropped(
&socket_addr,
&peer,
&PendingSessionHandshakeError::Eth(
Expand Down Expand Up @@ -1888,7 +1885,7 @@ mod tests {
})
.await;

peers.on_pending_session_dropped(
peers.on_outgoing_pending_session_dropped(
&socket_addr,
&peer,
&PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
Expand Down Expand Up @@ -2204,6 +2201,38 @@ mod tests {
assert_eq!(peers.num_outbound_connections(), 0);
}

#[tokio::test]
async fn test_outgoing_connection_gracefully_closed() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_peer(peer, socket_addr, None);

match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
assert_eq!(remote_addr, socket_addr);
}
_ => unreachable!(),
}

let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::PendingOut);

assert_eq!(peers.num_outbound_connections(), 0);

peers.on_outgoing_pending_session_gracefully_closed(&peer);

assert_eq!(peers.num_outbound_connections(), 0);
assert_eq!(peers.connection_info.num_pending_out, 0);
}

#[tokio::test]
async fn test_discovery_ban_list() {
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
Expand Down

0 comments on commit 71f8581

Please sign in to comment.