Skip to content

Commit

Permalink
Re-write Dialer behaviour logic
Browse files Browse the repository at this point in the history
  • Loading branch information
sandreae committed Jul 13, 2023
1 parent 5d9c42a commit b85b998
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 86 deletions.
166 changes: 84 additions & 82 deletions aquadoggo/src/network/dialer/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::task::{Context, Poll};

use exponential_backoff::Backoff;
use libp2p::core::Endpoint;
use libp2p::swarm::derive_prelude::ConnectionEstablished;
use libp2p::swarm::dummy::ConnectionHandler as DummyConnectionHandler;
use libp2p::swarm::{
ConnectionClosed, ConnectionDenied, ConnectionId, DialFailure, FromSwarm, NetworkBehaviour,
PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
ConnectionDenied, ConnectionId, DialFailure, FromSwarm, NetworkBehaviour, PollParameters,
THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use libp2p::{Multiaddr, PeerId};
use log::{debug, warn};
Expand All @@ -27,11 +27,8 @@ pub enum Event {
}

/// Status of a peer known to the dialer behaviour.
#[derive(Clone, Debug, Default)]
pub struct PeerStatus {
/// Number of existing connections to this peer.
connections: u32,

#[derive(Clone, Debug)]
pub struct RetryStatus {
/// Number of dial attempts to this peer since it was last disconnected.
///
/// Is reset to 0 once a connection is successfully established.
Expand All @@ -42,13 +39,22 @@ pub struct PeerStatus {
next_dial: Option<Instant>,
}

/// Behaviour responsible for dialing peers discovered by the `swarm`. If all connections close to
/// a peer, then the dialer attempts to reconnect to the peer with backoff until `RETRY_LIMIT` is
/// reached.
/// Behaviour responsible for dialing peers discovered by the `swarm` and retrying when dial
/// attempts fail or a connection is unexpectedly closed.
///
/// Maintains two lists of peers to be dialed:
/// 1) A queue of peers we want to make a new dial attempt to are queued up in `dial`. Peers are
/// placed here when they are first discovered or when an existing connection unexpectedly closes.
/// 2) Peers whose's initial dial failed are held in `retry` and dial attempts are retried with a
/// backoff until `RETRY_LIMIT` is reached. Peers are placed here when a `DialFailed` event is
/// issued from the swarm.
#[derive(Debug)]
pub struct Behaviour {
/// Discovered peers we want to dial.
peers: HashMap<PeerId, PeerStatus>,
/// Queue of known peers we want to make a new dial attempt to.
dial: VecDeque<PeerId>,

/// Map of peers whose initial dial attempt failed and we want to re-dial.
retry: HashMap<PeerId, RetryStatus>,

/// The backoff instance used for scheduling next_dials.
backoff: Backoff,
Expand All @@ -61,90 +67,78 @@ impl Behaviour {
let backoff = Backoff::new(RETRY_LIMIT, min, max);

Self {
peers: HashMap::new(),
dial: VecDeque::new(),
retry: HashMap::new(),
backoff,
}
}

/// Add a peer to the map of known peers who should be dialed.
pub fn peer_discovered(&mut self, peer_id: PeerId) {
if self.peers.get(&peer_id).is_none() {
self.peers.insert(peer_id, PeerStatus::default());
self.schedule_dial(&peer_id);
}
/// Add a known peer to the dial queue.
pub fn dial_peer(&mut self, peer_id: PeerId) {
self.dial.push_back(peer_id);
}

/// Remove a peer from the map of known peers.
/// Inform the behaviour that an unexpected error occurred on a connection to a peer, passing
/// in the number of remaining connections which exist.
///
/// Called externally if a peers registration expires on the `swarm`.
pub fn peer_expired(&mut self, peer_id: PeerId) {
// If the peer doesn't exist this means it was already dropped because the RETRY_LIMIT was
// reached, so don't try and remove it again.
if self.peers.get(&peer_id).is_some() {
self.remove_peer(&peer_id)
/// If there are no remaining connections for this peer we add the peer to the dial queue.
pub fn connection_error(&mut self, peer_id: PeerId, remaining_connections: u32) {
if remaining_connections == 0 {
self.dial_peer(peer_id);
}
}

/// Schedule a peer to be dialed.
/// Schedule a peer to be re-dialed.
///
/// Uses the configured `backoff` instance for the behaviour and the current `attempts` count
/// for the passed node to set the `next_dial` time for this peer.
///
/// If the `REDIAL_LIMIT` is reached then the peer is removed from the map of known peers and
/// no more redial attempts will occur.
fn schedule_dial(&mut self, peer_id: &PeerId) {
if let Some(status) = self.peers.get_mut(peer_id) {
fn schedule_retry(&mut self, peer_id: &PeerId) {
if let Some(status) = self.retry.get_mut(peer_id) {
// If the peer is already in the `retry` map then we check if there is a next backoff
// delay based on the current number of retry attempts.
if let Some(backoff_delay) = self.backoff.next(status.attempts) {
// If we haven't reached `RETRY_LIMIT` then we set the `next_dial` time.
status.next_dial = Some(Instant::now() + backoff_delay);
} else {
debug!("Re-dial attempt limit reached: remove peer {peer_id:?}");
self.remove_peer(peer_id)
}
}
}

fn on_connection_established(&mut self, peer_id: PeerId) {
match self.peers.get_mut(&peer_id) {
Some(status) => {
status.connections += 1;
}
None => {
warn!("Connected established to unknown peer");
// If we have reached `RETRY_LIMIT` then we remove the peer from the `retry` map.
debug!("Re-dial attempt limit reached: {peer_id:?}");
self.retry.remove(peer_id);
}
} else {
// If this peer was not in the `retry` map yet we instantiate a new retry status and
// insert the peer.
let backoff_delay = self
.backoff
.next(1)
.expect("Retry limit should be greater than 1");

let status = RetryStatus {
attempts: 1,
next_dial: Some(Instant::now() + backoff_delay),
};
self.retry.insert(peer_id.to_owned(), status);
}
}

fn on_connection_closed(&mut self, peer_id: &PeerId) {
match self.peers.get_mut(peer_id) {
Some(status) => {
status.connections -= 1;

if status.connections == 0 {
self.schedule_dial(peer_id)
}
}
None => {
warn!("Connected closed to unknown peer");
}
}
/// Inform the behaviour that a connection to a peer was established.
///
/// When a connection was successfully established we remove the peer from the retry map as we
/// no longer need to re-dial them.
fn on_connection_established(&mut self, peer_id: &PeerId) {
if self.retry.remove(peer_id).is_some() {
debug!("Removed peer from retry queue: {peer_id}");
};
}

/// Inform the behaviour that a dialing attempt failed.
///
/// We want the schedule the next dialing attempt or drop the peer completely if we have
/// reached `RETRY_LIMIT`.
fn on_dial_failed(&mut self, peer_id: &PeerId) {
match self.peers.get_mut(peer_id) {
Some(status) => {
if status.connections == 0 {
self.schedule_dial(peer_id)
}
}
None => warn!("Dial failed to unknown peer"),
}
}

fn remove_peer(&mut self, peer_id: &PeerId) {
if self.peers.remove(peer_id).is_none() {
// Don't warn here, peers can be removed twice if their registration on the swarm
// expired at the same time as their RETRY_LIMIT was reached.
}
self.schedule_retry(peer_id)
}
}

Expand Down Expand Up @@ -176,19 +170,18 @@ impl NetworkBehaviour for Behaviour {
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => {
self.on_connection_established(peer_id);
}
FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, .. }) => {
self.on_connection_closed(&peer_id);
self.on_connection_established(&peer_id)
}
FromSwarm::DialFailure(DialFailure { peer_id, .. }) => {
if let Some(peer_id) = peer_id {
debug!("Dialing peer failed: {peer_id}");
self.on_dial_failed(&peer_id);
} else {
warn!("Dial failed to unknown peer")
}
}
FromSwarm::AddressChange(_)
FromSwarm::ConnectionClosed(_)
| FromSwarm::AddressChange(_)
| FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_)
| FromSwarm::NewListenAddr(_)
Expand All @@ -214,30 +207,39 @@ impl NetworkBehaviour for Behaviour {
_cx: &mut Context<'_>,
_params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
let mut peer_to_dial = None;
// First dial the next peer which exist in the `dial` queue.
if let Some(peer_id) = self.dial.pop_back() {
debug!("Dial: {peer_id}");
return Poll::Ready(ToSwarm::GenerateEvent(Event::Dial(peer_id.to_owned())));
}

// If there were none we move onto peers we want to re-dial.
let mut peer_to_retry = None;
let now = Instant::now();

// Iterate over all peers and take the first one which has `next_dial` set and the
// scheduled dial time has passed.
for (peer_id, status) in &self.peers {
for (peer_id, status) in &self.retry {
if let Some(next_dial) = status.next_dial {
if next_dial < now {
peer_to_dial = Some(peer_id.to_owned());
peer_to_retry = Some(peer_id.to_owned());
break;
}
}
}

if let Some(peer_id) = peer_to_dial {
if let Some(peer_id) = peer_to_retry {
// Unwrap safely as we know the peer exists.
let status = self.peers.get_mut(&peer_id).unwrap();
let status = self.retry.get_mut(&peer_id).unwrap();

debug!("Re-dial attempt {} for peer {peer_id}", status.attempts);

// Increment the peers dial attempts.
status.attempts += 1;

// Set the peers `next_dial` value to None. This get's set again if the dial attempt fails.
status.next_dial = None;

debug!("Dial attempt {} for peer {peer_id}", status.attempts);
return Poll::Ready(ToSwarm::GenerateEvent(Event::Dial(peer_id.to_owned())));
}

Expand Down
19 changes: 15 additions & 4 deletions aquadoggo/src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ impl EventLoop {
peer_id,
connection_id,
cause,
num_established,
..
} => match cause {
Some(ConnectionError::IO(error)) => {
Expand All @@ -254,6 +255,10 @@ impl EventLoop {
match error.to_string().as_str() {
"timed out" => {
debug!("Connection {connection_id:?} timed out with peer {peer_id}");
self.swarm
.behaviour_mut()
.dialer
.connection_error(peer_id, num_established);
}
"closed by peer: 0" => {
// We received an `ApplicationClose` with code 0 here which means the
Expand All @@ -262,6 +267,10 @@ impl EventLoop {
}
_ => {
warn!("Connection error occurred with peer {peer_id} on connection {connection_id:?}");
self.swarm
.behaviour_mut()
.dialer
.connection_error(peer_id, num_established);
}
}
}
Expand All @@ -270,6 +279,10 @@ impl EventLoop {
}
Some(ConnectionError::Handler(_)) => {
warn!("Connection handler error occurred with peer {peer_id}");
self.swarm
.behaviour_mut()
.dialer
.connection_error(peer_id, num_established);
}
None => {
debug!("Connection closed with peer {peer_id}");
Expand Down Expand Up @@ -326,13 +339,12 @@ impl EventLoop {
mdns::Event::Discovered(list) => {
for (peer_id, _multiaddr) in list {
debug!("mDNS discovered a new peer: {peer_id}");
self.swarm.behaviour_mut().dialer.peer_discovered(peer_id)
self.swarm.behaviour_mut().dialer.dial_peer(peer_id)
}
}
mdns::Event::Expired(list) => {
for (peer_id, _multiaddr) in list {
trace!("mDNS peer has expired: {peer_id}");
self.swarm.behaviour_mut().dialer.peer_expired(peer_id)
}
}
},
Expand Down Expand Up @@ -370,7 +382,7 @@ impl EventLoop {
// Only dial remote peers discovered via rendezvous server
if peer_id != local_peer_id {
debug!("Discovered peer {peer_id} at {address}");
self.swarm.behaviour_mut().dialer.peer_discovered(peer_id);
self.swarm.behaviour_mut().dialer.dial_peer(peer_id);
}
}
}
Expand All @@ -383,7 +395,6 @@ impl EventLoop {
}
rendezvous::client::Event::Expired { peer } => {
trace!("Peer registration with rendezvous expired: {peer:?}");
self.swarm.behaviour_mut().dialer.peer_expired(peer);
}
},
SwarmEvent::Behaviour(BehaviourEvent::RendezvousServer(event)) => match event {
Expand Down

0 comments on commit b85b998

Please sign in to comment.