Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: decrease connection info based on current state #7165

Merged
merged 1 commit into from Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/net/network/src/manager.rs
Expand Up @@ -796,7 +796,7 @@ where
);

if let Some(ref err) = error {
self.swarm.state_mut().peers_mut().on_pending_session_dropped(
self.swarm.state_mut().peers_mut().on_outgoing_pending_session_dropped(
&remote_addr,
&peer_id,
err,
Expand All @@ -809,7 +809,7 @@ where
self.swarm
.state_mut()
.peers_mut()
.on_pending_session_gracefully_closed(&peer_id);
.on_outgoing_pending_session_gracefully_closed(&peer_id);
}
self.metrics.closed_sessions.increment(1);
self.metrics
Expand Down
47 changes: 38 additions & 9 deletions crates/net/network/src/peers/manager.rs
Expand Up @@ -427,19 +427,16 @@ impl PeersManager {
}

/// Gracefully disconnected a pending _outgoing_ session
pub(crate) fn on_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
if let Some(peer) = self.peers.get_mut(peer_id) {
self.connection_info.decr_state(peer.state);
peer.state = PeerConnectionState::Idle;
} else {
return
}

self.connection_info.decr_out();
}

/// Invoked when an _outgoing_ pending session was closed during authentication or the
/// handshake.
pub(crate) fn on_pending_session_dropped(
pub(crate) fn on_outgoing_pending_session_dropped(
&mut self,
remote_addr: &SocketAddr,
peer_id: &PeerId,
Expand Down Expand Up @@ -1675,7 +1672,7 @@ mod tests {
})
.await;

peers.on_pending_session_dropped(
peers.on_outgoing_pending_session_dropped(
&socket_addr,
&peer,
&PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
Expand Down Expand Up @@ -1838,7 +1835,7 @@ mod tests {
})
.await;

peers.on_pending_session_dropped(
peers.on_outgoing_pending_session_dropped(
&socket_addr,
&peer,
&PendingSessionHandshakeError::Eth(
Expand Down Expand Up @@ -1888,7 +1885,7 @@ mod tests {
})
.await;

peers.on_pending_session_dropped(
peers.on_outgoing_pending_session_dropped(
&socket_addr,
&peer,
&PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
Expand Down Expand Up @@ -2204,6 +2201,38 @@ mod tests {
assert_eq!(peers.num_outbound_connections(), 0);
}

#[tokio::test]
async fn test_outgoing_connection_gracefully_closed() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_peer(peer, socket_addr, None);

match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
assert_eq!(remote_addr, socket_addr);
}
_ => unreachable!(),
}

let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::PendingOut);

assert_eq!(peers.num_outbound_connections(), 0);

peers.on_outgoing_pending_session_gracefully_closed(&peer);

assert_eq!(peers.num_outbound_connections(), 0);
assert_eq!(peers.connection_info.num_pending_out, 0);
}

#[tokio::test]
async fn test_discovery_ban_list() {
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
Expand Down