From b85b9980413a5622df7d6d44e9f793957a4805f8 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 13 Jul 2023 18:52:01 +0100 Subject: [PATCH] Re-write `Dialer` behaviour logic --- aquadoggo/src/network/dialer/behaviour.rs | 166 +++++++++++----------- aquadoggo/src/network/service.rs | 19 ++- 2 files changed, 99 insertions(+), 86 deletions(-) diff --git a/aquadoggo/src/network/dialer/behaviour.rs b/aquadoggo/src/network/dialer/behaviour.rs index d448269bb..eed3ed1f8 100644 --- a/aquadoggo/src/network/dialer/behaviour.rs +++ b/aquadoggo/src/network/dialer/behaviour.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::task::{Context, Poll}; use exponential_backoff::Backoff; @@ -8,8 +8,8 @@ use libp2p::core::Endpoint; use libp2p::swarm::derive_prelude::ConnectionEstablished; use libp2p::swarm::dummy::ConnectionHandler as DummyConnectionHandler; use libp2p::swarm::{ - ConnectionClosed, ConnectionDenied, ConnectionId, DialFailure, FromSwarm, NetworkBehaviour, - PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + ConnectionDenied, ConnectionId, DialFailure, FromSwarm, NetworkBehaviour, PollParameters, + THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use libp2p::{Multiaddr, PeerId}; use log::{debug, warn}; @@ -27,11 +27,8 @@ pub enum Event { } /// Status of a peer known to the dialer behaviour. -#[derive(Clone, Debug, Default)] -pub struct PeerStatus { - /// Number of existing connections to this peer. - connections: u32, - +#[derive(Clone, Debug)] +pub struct RetryStatus { /// Number of dial attempts to this peer since it was last disconnected. /// /// Is reset to 0 once a connection is successfully established. @@ -42,13 +39,22 @@ pub struct PeerStatus { next_dial: Option, } -/// Behaviour responsible for dialing peers discovered by the `swarm`. If all connections close to -/// a peer, then the dialer attempts to reconnect to the peer with backoff until `RETRY_LIMIT` is -/// reached. +/// Behaviour responsible for dialing peers discovered by the `swarm` and retrying when dial +/// attempts fail or a connection is unexpectedly closed. +/// +/// Maintains two lists of peers to be dialed: +/// 1) A queue of peers we want to make a new dial attempt to are queued up in `dial`. Peers are +/// placed here when they are first discovered or when an existing connection unexpectedly closes. +/// 2) Peers whose's initial dial failed are held in `retry` and dial attempts are retried with a +/// backoff until `RETRY_LIMIT` is reached. Peers are placed here when a `DialFailed` event is +/// issued from the swarm. #[derive(Debug)] pub struct Behaviour { - /// Discovered peers we want to dial. - peers: HashMap, + /// Queue of known peers we want to make a new dial attempt to. + dial: VecDeque, + + /// Map of peers whose initial dial attempt failed and we want to re-dial. + retry: HashMap, /// The backoff instance used for scheduling next_dials. backoff: Backoff, @@ -61,90 +67,78 @@ impl Behaviour { let backoff = Backoff::new(RETRY_LIMIT, min, max); Self { - peers: HashMap::new(), + dial: VecDeque::new(), + retry: HashMap::new(), backoff, } } - /// Add a peer to the map of known peers who should be dialed. - pub fn peer_discovered(&mut self, peer_id: PeerId) { - if self.peers.get(&peer_id).is_none() { - self.peers.insert(peer_id, PeerStatus::default()); - self.schedule_dial(&peer_id); - } + /// Add a known peer to the dial queue. + pub fn dial_peer(&mut self, peer_id: PeerId) { + self.dial.push_back(peer_id); } - /// Remove a peer from the map of known peers. + /// Inform the behaviour that an unexpected error occurred on a connection to a peer, passing + /// in the number of remaining connections which exist. /// - /// Called externally if a peers registration expires on the `swarm`. - pub fn peer_expired(&mut self, peer_id: PeerId) { - // If the peer doesn't exist this means it was already dropped because the RETRY_LIMIT was - // reached, so don't try and remove it again. - if self.peers.get(&peer_id).is_some() { - self.remove_peer(&peer_id) + /// If there are no remaining connections for this peer we add the peer to the dial queue. + pub fn connection_error(&mut self, peer_id: PeerId, remaining_connections: u32) { + if remaining_connections == 0 { + self.dial_peer(peer_id); } } - /// Schedule a peer to be dialed. + /// Schedule a peer to be re-dialed. /// /// Uses the configured `backoff` instance for the behaviour and the current `attempts` count /// for the passed node to set the `next_dial` time for this peer. /// /// If the `REDIAL_LIMIT` is reached then the peer is removed from the map of known peers and /// no more redial attempts will occur. - fn schedule_dial(&mut self, peer_id: &PeerId) { - if let Some(status) = self.peers.get_mut(peer_id) { + fn schedule_retry(&mut self, peer_id: &PeerId) { + if let Some(status) = self.retry.get_mut(peer_id) { + // If the peer is already in the `retry` map then we check if there is a next backoff + // delay based on the current number of retry attempts. if let Some(backoff_delay) = self.backoff.next(status.attempts) { + // If we haven't reached `RETRY_LIMIT` then we set the `next_dial` time. status.next_dial = Some(Instant::now() + backoff_delay); } else { - debug!("Re-dial attempt limit reached: remove peer {peer_id:?}"); - self.remove_peer(peer_id) - } - } - } - - fn on_connection_established(&mut self, peer_id: PeerId) { - match self.peers.get_mut(&peer_id) { - Some(status) => { - status.connections += 1; - } - None => { - warn!("Connected established to unknown peer"); + // If we have reached `RETRY_LIMIT` then we remove the peer from the `retry` map. + debug!("Re-dial attempt limit reached: {peer_id:?}"); + self.retry.remove(peer_id); } + } else { + // If this peer was not in the `retry` map yet we instantiate a new retry status and + // insert the peer. + let backoff_delay = self + .backoff + .next(1) + .expect("Retry limit should be greater than 1"); + + let status = RetryStatus { + attempts: 1, + next_dial: Some(Instant::now() + backoff_delay), + }; + self.retry.insert(peer_id.to_owned(), status); } } - fn on_connection_closed(&mut self, peer_id: &PeerId) { - match self.peers.get_mut(peer_id) { - Some(status) => { - status.connections -= 1; - - if status.connections == 0 { - self.schedule_dial(peer_id) - } - } - None => { - warn!("Connected closed to unknown peer"); - } - } + /// Inform the behaviour that a connection to a peer was established. + /// + /// When a connection was successfully established we remove the peer from the retry map as we + /// no longer need to re-dial them. + fn on_connection_established(&mut self, peer_id: &PeerId) { + if self.retry.remove(peer_id).is_some() { + debug!("Removed peer from retry queue: {peer_id}"); + }; } + /// Inform the behaviour that a dialing attempt failed. + /// + /// We want the schedule the next dialing attempt or drop the peer completely if we have + /// reached `RETRY_LIMIT`. fn on_dial_failed(&mut self, peer_id: &PeerId) { - match self.peers.get_mut(peer_id) { - Some(status) => { - if status.connections == 0 { - self.schedule_dial(peer_id) - } - } - None => warn!("Dial failed to unknown peer"), - } - } - - fn remove_peer(&mut self, peer_id: &PeerId) { - if self.peers.remove(peer_id).is_none() { - // Don't warn here, peers can be removed twice if their registration on the swarm - // expired at the same time as their RETRY_LIMIT was reached. - } + self.schedule_retry(peer_id) } } @@ -176,19 +170,18 @@ impl NetworkBehaviour for Behaviour { fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => { - self.on_connection_established(peer_id); - } - FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, .. }) => { - self.on_connection_closed(&peer_id); + self.on_connection_established(&peer_id) } FromSwarm::DialFailure(DialFailure { peer_id, .. }) => { if let Some(peer_id) = peer_id { + debug!("Dialing peer failed: {peer_id}"); self.on_dial_failed(&peer_id); } else { warn!("Dial failed to unknown peer") } } - FromSwarm::AddressChange(_) + FromSwarm::ConnectionClosed(_) + | FromSwarm::AddressChange(_) | FromSwarm::ListenFailure(_) | FromSwarm::NewListener(_) | FromSwarm::NewListenAddr(_) @@ -214,22 +207,32 @@ impl NetworkBehaviour for Behaviour { _cx: &mut Context<'_>, _params: &mut impl PollParameters, ) -> Poll>> { - let mut peer_to_dial = None; + // First dial the next peer which exist in the `dial` queue. + if let Some(peer_id) = self.dial.pop_back() { + debug!("Dial: {peer_id}"); + return Poll::Ready(ToSwarm::GenerateEvent(Event::Dial(peer_id.to_owned()))); + } + + // If there were none we move onto peers we want to re-dial. + let mut peer_to_retry = None; let now = Instant::now(); // Iterate over all peers and take the first one which has `next_dial` set and the // scheduled dial time has passed. - for (peer_id, status) in &self.peers { + for (peer_id, status) in &self.retry { if let Some(next_dial) = status.next_dial { if next_dial < now { - peer_to_dial = Some(peer_id.to_owned()); + peer_to_retry = Some(peer_id.to_owned()); + break; } } } - if let Some(peer_id) = peer_to_dial { + if let Some(peer_id) = peer_to_retry { // Unwrap safely as we know the peer exists. - let status = self.peers.get_mut(&peer_id).unwrap(); + let status = self.retry.get_mut(&peer_id).unwrap(); + + debug!("Re-dial attempt {} for peer {peer_id}", status.attempts); // Increment the peers dial attempts. status.attempts += 1; @@ -237,7 +240,6 @@ impl NetworkBehaviour for Behaviour { // Set the peers `next_dial` value to None. This get's set again if the dial attempt fails. status.next_dial = None; - debug!("Dial attempt {} for peer {peer_id}", status.attempts); return Poll::Ready(ToSwarm::GenerateEvent(Event::Dial(peer_id.to_owned()))); } diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index 3ab9d45ea..293a39d84 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -246,6 +246,7 @@ impl EventLoop { peer_id, connection_id, cause, + num_established, .. } => match cause { Some(ConnectionError::IO(error)) => { @@ -254,6 +255,10 @@ impl EventLoop { match error.to_string().as_str() { "timed out" => { debug!("Connection {connection_id:?} timed out with peer {peer_id}"); + self.swarm + .behaviour_mut() + .dialer + .connection_error(peer_id, num_established); } "closed by peer: 0" => { // We received an `ApplicationClose` with code 0 here which means the @@ -262,6 +267,10 @@ impl EventLoop { } _ => { warn!("Connection error occurred with peer {peer_id} on connection {connection_id:?}"); + self.swarm + .behaviour_mut() + .dialer + .connection_error(peer_id, num_established); } } } @@ -270,6 +279,10 @@ impl EventLoop { } Some(ConnectionError::Handler(_)) => { warn!("Connection handler error occurred with peer {peer_id}"); + self.swarm + .behaviour_mut() + .dialer + .connection_error(peer_id, num_established); } None => { debug!("Connection closed with peer {peer_id}"); @@ -326,13 +339,12 @@ impl EventLoop { mdns::Event::Discovered(list) => { for (peer_id, _multiaddr) in list { debug!("mDNS discovered a new peer: {peer_id}"); - self.swarm.behaviour_mut().dialer.peer_discovered(peer_id) + self.swarm.behaviour_mut().dialer.dial_peer(peer_id) } } mdns::Event::Expired(list) => { for (peer_id, _multiaddr) in list { trace!("mDNS peer has expired: {peer_id}"); - self.swarm.behaviour_mut().dialer.peer_expired(peer_id) } } }, @@ -370,7 +382,7 @@ impl EventLoop { // Only dial remote peers discovered via rendezvous server if peer_id != local_peer_id { debug!("Discovered peer {peer_id} at {address}"); - self.swarm.behaviour_mut().dialer.peer_discovered(peer_id); + self.swarm.behaviour_mut().dialer.dial_peer(peer_id); } } } @@ -383,7 +395,6 @@ impl EventLoop { } rendezvous::client::Event::Expired { peer } => { trace!("Peer registration with rendezvous expired: {peer:?}"); - self.swarm.behaviour_mut().dialer.peer_expired(peer); } }, SwarmEvent::Behaviour(BehaviourEvent::RendezvousServer(event)) => match event {