From 3aa298171489f803636e731fe74ce3ad75cefc65 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 11 Jul 2023 21:40:40 +0900 Subject: [PATCH 01/16] WIP: redial behaviour --- aquadoggo/src/network/behaviour.rs | 7 +- aquadoggo/src/network/mod.rs | 1 + aquadoggo/src/network/redial/behaviour.rs | 194 ++++++++++++++++++++++ aquadoggo/src/network/redial/mod.rs | 5 + aquadoggo/src/network/service.rs | 15 +- 5 files changed, 212 insertions(+), 10 deletions(-) create mode 100644 aquadoggo/src/network/redial/behaviour.rs create mode 100644 aquadoggo/src/network/redial/mod.rs diff --git a/aquadoggo/src/network/behaviour.rs b/aquadoggo/src/network/behaviour.rs index 1742d480d..999a04334 100644 --- a/aquadoggo/src/network/behaviour.rs +++ b/aquadoggo/src/network/behaviour.rs @@ -10,8 +10,8 @@ use libp2p::{autonat, connection_limits, identify, mdns, ping, relay, rendezvous use log::debug; use crate::network::config::NODE_NAMESPACE; -use crate::network::peers; use crate::network::NetworkConfiguration; +use crate::network::{peers, redial}; /// How often do we broadcast mDNS queries into the network. const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(5); @@ -72,6 +72,8 @@ pub struct Behaviour { /// Register peer connections and handle p2panda messaging with them. pub peers: peers::Behaviour, + + pub redial: redial::Behaviour, } impl Behaviour { @@ -171,6 +173,8 @@ impl Behaviour { // Create behaviour to manage peer connections and handle p2panda messaging let peers = peers::Behaviour::new(); + let redial = redial::Behaviour::new(); + Ok(Self { autonat: autonat.into(), identify: identify.into(), @@ -182,6 +186,7 @@ impl Behaviour { relay_client: relay_client.into(), relay_server: relay_server.into(), peers, + redial }) } } diff --git a/aquadoggo/src/network/mod.rs b/aquadoggo/src/network/mod.rs index a5440ba00..f2a089546 100644 --- a/aquadoggo/src/network/mod.rs +++ b/aquadoggo/src/network/mod.rs @@ -4,6 +4,7 @@ mod behaviour; mod config; pub mod identity; mod peers; +mod redial; mod service; mod shutdown; mod swarm; diff --git a/aquadoggo/src/network/redial/behaviour.rs b/aquadoggo/src/network/redial/behaviour.rs new file mode 100644 index 000000000..c252fac1a --- /dev/null +++ b/aquadoggo/src/network/redial/behaviour.rs @@ -0,0 +1,194 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::collections::{HashMap, VecDeque}; +use std::task::{Context, Poll}; + +use libp2p::core::Endpoint; +use libp2p::swarm::derive_prelude::ConnectionEstablished; +use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; +use libp2p::swarm::dummy::ConnectionHandler as DummyConnectionHandler; +use libp2p::swarm::{ + ConnectionClosed, ConnectionDenied, ConnectionId, DialFailure, FromSwarm, NetworkBehaviour, + PollParameters, SwarmEvent, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, +}; +use libp2p::{Multiaddr, PeerId}; +use log::{warn, debug}; + +/// The empty type for cases which can't occur. +#[derive(Clone, Debug)] +pub enum Event { + DialPeer(PeerId), +} + +#[derive(Clone, Debug, Default)] +pub struct PeerStatus { + connections: i32, + dial_attempts: i32, +} + +#[derive(Debug)] +pub struct Behaviour { + events: VecDeque, + peers: HashMap, +} + +impl Behaviour { + pub fn new() -> Self { + Self { + events: VecDeque::new(), + peers: HashMap::new(), + } + } + + pub fn peer_discovered(&mut self, peer_id: PeerId) { + if self.peers.get(&peer_id).is_none() { + self.peers.insert(peer_id, PeerStatus::default()); + self.events + .push_back(peer_id.clone()); + } + } + + pub fn peer_expired(&mut self, peer_id: PeerId) { + self.remove_peer(&peer_id) + } + + fn redial_peer(&mut self, peer_id: &PeerId) { + debug!("Re-dial peer: {peer_id:?}"); + if let Some(status) = self.peers.get(peer_id) { + if status.connections == 0 { + if status.dial_attempts < 10 { + self.events + .push_back(peer_id.clone()); + } else { + debug!("Re-dial attempt limit reached: remove peer {peer_id:?}"); + self.remove_peer(&peer_id) + } + } + } + } + + fn on_connection_established(&mut self, peer_id: PeerId) { + match self.peers.get_mut(&peer_id) { + Some(status) => { + status.dial_attempts = 0; + status.connections += 1; + } + None => { + warn!("Connected established to unknown peer"); + } + } + } + + fn on_connection_closed(&mut self, peer_id: &PeerId) { + match self.peers.get_mut(peer_id) { + Some(status) => { + status.connections -= 1; + + self.redial_peer(peer_id) + + } + None => { + warn!("Connected closed to unknown peer"); + } + } + } + + fn on_dial_failed(&mut self, peer_id: &PeerId) { + match self.peers.get_mut(peer_id) { + Some(status) => { + status.dial_attempts += 1; + + self.redial_peer(peer_id) + } + None => warn!("Dial failed to unknown peer"), + } + } + + fn remove_peer(&mut self, peer_id: &PeerId) { + if self.peers.remove(&peer_id).is_none() { + warn!("Tried to remove unknown peer") + } + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = DummyConnectionHandler; + + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(DummyConnectionHandler) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(DummyConnectionHandler) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => { + self.on_connection_established(peer_id); + } + FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, .. }) => { + self.on_connection_closed(&peer_id); + } + FromSwarm::DialFailure(DialFailure { + peer_id, + error, + connection_id, + }) => { + if let Some(peer_id) = peer_id { + self.on_dial_failed(&peer_id); + } else { + warn!("Dial failed to unknown peer") + } + } + FromSwarm::AddressChange(_) + | FromSwarm::ListenFailure(_) + | FromSwarm::NewListener(_) + | FromSwarm::NewListenAddr(_) + | FromSwarm::ExpiredListenAddr(_) + | FromSwarm::ListenerError(_) + | FromSwarm::ListenerClosed(_) + | FromSwarm::NewExternalAddrCandidate(_) + | FromSwarm::ExternalAddrConfirmed(_) + | FromSwarm::ExternalAddrExpired(_) => {} + } + } + + fn on_connection_handler_event( + &mut self, + _id: PeerId, + _: ConnectionId, + _: THandlerOutEvent, + ) { + () + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + _params: &mut impl PollParameters, + ) -> Poll>> { + if let Some(peer_id) = self.events.pop_front() { + return Poll::Ready(ToSwarm::Dial { + opts: DialOpts::peer_id(peer_id).build(), + }); + // return Poll::Ready(event); + } + + Poll::Pending + } +} diff --git a/aquadoggo/src/network/redial/mod.rs b/aquadoggo/src/network/redial/mod.rs new file mode 100644 index 000000000..c284a6fc7 --- /dev/null +++ b/aquadoggo/src/network/redial/mod.rs @@ -0,0 +1,5 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +mod behaviour; + +pub use behaviour::Behaviour; diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index 66077549b..8bc428490 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -325,18 +325,13 @@ impl EventLoop { mdns::Event::Discovered(list) => { for (peer_id, multiaddr) in list { debug!("mDNS discovered a new peer: {peer_id}"); - - if let Err(err) = self.swarm.dial(multiaddr) { - warn!("Failed to dial: {}", err); - } else { - debug!("Dial success: skip remaining addresses for: {peer_id}"); - break; - } + self.swarm.behaviour_mut().redial.peer_discovered(peer_id) } } mdns::Event::Expired(list) => { - for (peer, _multiaddr) in list { - trace!("mDNS peer has expired: {peer}"); + for (peer_id, _multiaddr) in list { + trace!("mDNS peer has expired: {peer_id}"); + self.swarm.behaviour_mut().redial.peer_expired(peer_id) } } }, @@ -535,6 +530,8 @@ impl EventLoop { )) } }, + + SwarmEvent::Behaviour(BehaviourEvent::Redial(_)) => (), } } } From dea17200119bf35946eb51f4243f4078ab53b32a Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 11 Jul 2023 21:50:01 +0900 Subject: [PATCH 02/16] Improve log messages and doc strings in ConnectionManager --- aquadoggo/src/replication/service.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/aquadoggo/src/replication/service.rs b/aquadoggo/src/replication/service.rs index 2e858c7b3..35069cf04 100644 --- a/aquadoggo/src/replication/service.rs +++ b/aquadoggo/src/replication/service.rs @@ -143,9 +143,9 @@ impl ConnectionManager { TargetSet::new(&supported_schema_ids) } - /// Register a new peer in the network. + /// Register a new peer connection on the manager. async fn on_connection_established(&mut self, peer: Peer) { - info!("Connected to peer: {}", peer.display()); + info!("Established connection with peer: {}", peer.display()); match self.peers.get(&peer) { Some(_) => { @@ -158,20 +158,19 @@ impl ConnectionManager { } } - /// Handle a peer disconnecting from the network. + /// Handle a peer connection closing. async fn on_connection_closed(&mut self, peer: Peer) { - info!("Disconnected from peer: {}", peer.display()); + info!("Closed connection with peer: {}", peer.display()); // Clear running replication sessions from sync manager self.sync_manager.remove_sessions(&peer); self.remove_connection(peer) } - /// Remove a peer from the network. + /// Remove a peer connection from the manager. fn remove_connection(&mut self, peer: Peer) { - match self.peers.remove(&peer) { - Some(_) => debug!("Remove peer: {}", peer.display()), - None => warn!("Tried to remove connection from unknown peer"), + if self.peers.remove(&peer).is_none() { + warn!("Tried to remove connection from unknown peer") } } From 6a12f545b3b53c4f60ddaba28662d20ff49ca863 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Wed, 12 Jul 2023 16:05:29 +0100 Subject: [PATCH 03/16] Add `void` to dependencies --- Cargo.lock | 1 + aquadoggo/Cargo.toml | 1 + 2 files changed, 2 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 39a13c2d9..c8909b81a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -129,6 +129,7 @@ dependencies = [ "tower-http", "tower-service", "triggered", + "void", ] [[package]] diff --git a/aquadoggo/Cargo.toml b/aquadoggo/Cargo.toml index a25f01655..fdf045765 100644 --- a/aquadoggo/Cargo.toml +++ b/aquadoggo/Cargo.toml @@ -75,6 +75,7 @@ tower-http = { version = "0.3.4", default-features = false, features = [ "cors", ] } triggered = "0.1.2" +void = "1.0.2" [dev-dependencies] ciborium = "0.2.0" From 69e1e8b6491946f1d0d8b61c980e72ccb833e6f5 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Wed, 12 Jul 2023 16:06:14 +0100 Subject: [PATCH 04/16] Basic dial/re-dial implementation, module renamed `dialer` --- aquadoggo/src/network/behaviour.rs | 6 +- .../network/{redial => dialer}/behaviour.rs | 89 +++++++++++-------- .../src/network/{redial => dialer}/mod.rs | 2 +- aquadoggo/src/network/mod.rs | 2 +- aquadoggo/src/network/peers/handler.rs | 12 +-- aquadoggo/src/network/service.rs | 29 ++++-- 6 files changed, 83 insertions(+), 57 deletions(-) rename aquadoggo/src/network/{redial => dialer}/behaviour.rs (64%) rename aquadoggo/src/network/{redial => dialer}/mod.rs (61%) diff --git a/aquadoggo/src/network/behaviour.rs b/aquadoggo/src/network/behaviour.rs index 999a04334..d70f07646 100644 --- a/aquadoggo/src/network/behaviour.rs +++ b/aquadoggo/src/network/behaviour.rs @@ -11,7 +11,7 @@ use log::debug; use crate::network::config::NODE_NAMESPACE; use crate::network::NetworkConfiguration; -use crate::network::{peers, redial}; +use crate::network::{peers, dialer}; /// How often do we broadcast mDNS queries into the network. const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(5); @@ -73,7 +73,7 @@ pub struct Behaviour { /// Register peer connections and handle p2panda messaging with them. pub peers: peers::Behaviour, - pub redial: redial::Behaviour, + pub redial: dialer::Behaviour, } impl Behaviour { @@ -173,7 +173,7 @@ impl Behaviour { // Create behaviour to manage peer connections and handle p2panda messaging let peers = peers::Behaviour::new(); - let redial = redial::Behaviour::new(); + let redial = dialer::Behaviour::new(); Ok(Self { autonat: autonat.into(), diff --git a/aquadoggo/src/network/redial/behaviour.rs b/aquadoggo/src/network/dialer/behaviour.rs similarity index 64% rename from aquadoggo/src/network/redial/behaviour.rs rename to aquadoggo/src/network/dialer/behaviour.rs index c252fac1a..c3fea2c2f 100644 --- a/aquadoggo/src/network/redial/behaviour.rs +++ b/aquadoggo/src/network/dialer/behaviour.rs @@ -5,30 +5,45 @@ use std::task::{Context, Poll}; use libp2p::core::Endpoint; use libp2p::swarm::derive_prelude::ConnectionEstablished; -use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; use libp2p::swarm::dummy::ConnectionHandler as DummyConnectionHandler; use libp2p::swarm::{ ConnectionClosed, ConnectionDenied, ConnectionId, DialFailure, FromSwarm, NetworkBehaviour, - PollParameters, SwarmEvent, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use libp2p::{Multiaddr, PeerId}; -use log::{warn, debug}; +use log::{debug, warn}; +use void::Void; -/// The empty type for cases which can't occur. +const DIAL_LIMIT: i32 = 10; + +/// Events which are sent to the swarm from the dialer. #[derive(Clone, Debug)] pub enum Event { - DialPeer(PeerId), + Dial(PeerId), } +/// Status of a peer known to the dealer behaviour. #[derive(Clone, Debug, Default)] pub struct PeerStatus { + /// Number of existing connections to this peer. connections: i32, + + /// Number of failed dial attempts since the last successful connection. + /// + /// Resets back to zero once a connection is established. dial_attempts: i32, } +/// Behaviour responsible for dialing peers discovered by the `swarm`. If all connections close to +/// a peer, then the dialer attempts to reconnect to the peer. +/// +/// @TODO: Need to add back-off to redial attempts. #[derive(Debug)] pub struct Behaviour { - events: VecDeque, + /// The behaviours event queue which gets consumed when `poll` is called. + events: VecDeque>, + + /// Peers we are dialing. peers: HashMap, } @@ -40,30 +55,40 @@ impl Behaviour { } } + /// Add a peer to the map of known peers which this behaviour should be dialing. pub fn peer_discovered(&mut self, peer_id: PeerId) { if self.peers.get(&peer_id).is_none() { self.peers.insert(peer_id, PeerStatus::default()); self.events - .push_back(peer_id.clone()); + .push_back(ToSwarm::GenerateEvent(Event::Dial(peer_id.clone()))); } } + /// Remove a peer from the behaviours map of known peers. + /// + /// Called externally if a peer is known to have expired. pub fn peer_expired(&mut self, peer_id: PeerId) { self.remove_peer(&peer_id) } - fn redial_peer(&mut self, peer_id: &PeerId) { - debug!("Re-dial peer: {peer_id:?}"); - if let Some(status) = self.peers.get(peer_id) { - if status.connections == 0 { - if status.dial_attempts < 10 { - self.events - .push_back(peer_id.clone()); - } else { - debug!("Re-dial attempt limit reached: remove peer {peer_id:?}"); - self.remove_peer(&peer_id) - } - } + /// Issues `Dial` event to the `swarm` for the passed peer. + /// + /// Checks `dial_attempts` for peer first, if this has exceeded the `DIAL_LIMIT` then no + /// `Dial` event is issued. + fn dial_peer(&mut self, peer_id: &PeerId) { + if let Some(status) = self.peers.get_mut(peer_id) { + status.dial_attempts += 1; + if status.dial_attempts < DIAL_LIMIT { + debug!( + "Attempt redial {} with peer: {peer_id:?}", + status.dial_attempts + ); + self.events + .push_back(ToSwarm::GenerateEvent(Event::Dial(peer_id.clone()))); + } else { + debug!("Re-dial attempt limit reached: remove peer {peer_id:?}"); + self.remove_peer(&peer_id) + } } } @@ -84,8 +109,9 @@ impl Behaviour { Some(status) => { status.connections -= 1; - self.redial_peer(peer_id) - + if status.connections == 0 { + self.dial_peer(peer_id) + } } None => { warn!("Connected closed to unknown peer"); @@ -96,9 +122,9 @@ impl Behaviour { fn on_dial_failed(&mut self, peer_id: &PeerId) { match self.peers.get_mut(peer_id) { Some(status) => { - status.dial_attempts += 1; - - self.redial_peer(peer_id) + if status.connections == 0 { + self.dial_peer(peer_id) + } } None => warn!("Dial failed to unknown peer"), } @@ -144,11 +170,7 @@ impl NetworkBehaviour for Behaviour { FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, .. }) => { self.on_connection_closed(&peer_id); } - FromSwarm::DialFailure(DialFailure { - peer_id, - error, - connection_id, - }) => { + FromSwarm::DialFailure(DialFailure { peer_id, .. }) => { if let Some(peer_id) = peer_id { self.on_dial_failed(&peer_id); } else { @@ -174,7 +196,7 @@ impl NetworkBehaviour for Behaviour { _: ConnectionId, _: THandlerOutEvent, ) { - () + (); } fn poll( @@ -182,11 +204,8 @@ impl NetworkBehaviour for Behaviour { _cx: &mut Context<'_>, _params: &mut impl PollParameters, ) -> Poll>> { - if let Some(peer_id) = self.events.pop_front() { - return Poll::Ready(ToSwarm::Dial { - opts: DialOpts::peer_id(peer_id).build(), - }); - // return Poll::Ready(event); + if let Some(event) = self.events.pop_front() { + return Poll::Ready(event); } Poll::Pending diff --git a/aquadoggo/src/network/redial/mod.rs b/aquadoggo/src/network/dialer/mod.rs similarity index 61% rename from aquadoggo/src/network/redial/mod.rs rename to aquadoggo/src/network/dialer/mod.rs index c284a6fc7..32e362fb4 100644 --- a/aquadoggo/src/network/redial/mod.rs +++ b/aquadoggo/src/network/dialer/mod.rs @@ -2,4 +2,4 @@ mod behaviour; -pub use behaviour::Behaviour; +pub use behaviour::{Behaviour, Event}; diff --git a/aquadoggo/src/network/mod.rs b/aquadoggo/src/network/mod.rs index f2a089546..4c54039d2 100644 --- a/aquadoggo/src/network/mod.rs +++ b/aquadoggo/src/network/mod.rs @@ -4,7 +4,7 @@ mod behaviour; mod config; pub mod identity; mod peers; -mod redial; +mod dialer; mod service; mod shutdown; mod swarm; diff --git a/aquadoggo/src/network/peers/handler.rs b/aquadoggo/src/network/peers/handler.rs index 625095f3d..5e5eb1abe 100644 --- a/aquadoggo/src/network/peers/handler.rs +++ b/aquadoggo/src/network/peers/handler.rs @@ -221,15 +221,9 @@ impl ConnectionHandler for Handler { } ConnectionEvent::DialUpgradeError(_) | ConnectionEvent::AddressChange(_) - | ConnectionEvent::ListenUpgradeError(_) => { - warn!("Connection event error"); - } - ConnectionEvent::LocalProtocolsChange(_) => { - debug!("ConnectionEvent: LocalProtocolsChange") - } - ConnectionEvent::RemoteProtocolsChange(_) => { - debug!("ConnectionEvent: RemoteProtocolsChange") - } + | ConnectionEvent::ListenUpgradeError(_) + | ConnectionEvent::LocalProtocolsChange(_) + | ConnectionEvent::RemoteProtocolsChange(_) => (), } } diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index 8bc428490..6741c4e6f 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -5,6 +5,7 @@ use std::time::Duration; use anyhow::Result; use libp2p::multiaddr::Protocol; use libp2p::ping::Event; +use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; use libp2p::swarm::{ConnectionError, SwarmEvent}; use libp2p::{autonat, identify, mdns, rendezvous, Multiaddr, PeerId, Swarm}; use log::{debug, info, trace, warn}; @@ -17,7 +18,7 @@ use crate::context::Context; use crate::manager::{ServiceReadySender, Shutdown}; use crate::network::behaviour::{Behaviour, BehaviourEvent}; use crate::network::config::NODE_NAMESPACE; -use crate::network::{identity, peers, swarm, NetworkConfiguration, ShutdownHandler}; +use crate::network::{identity, peers, dialer, swarm, NetworkConfiguration, ShutdownHandler}; /// Network service that configures and deploys a libp2p network swarm over QUIC transports. /// @@ -260,7 +261,7 @@ impl EventLoop { debug!("Connection {connection_id:?} closed with peer {peer_id}"); } _ => { - warn!("Connection error occurred with peer {peer_id} on connection {connection_id:?}: {error}"); + warn!("Connection error occurred with peer {peer_id} on connection {connection_id:?}"); } } } @@ -287,9 +288,9 @@ impl EventLoop { connection_id, local_addr, send_back_addr, - error, + .. } => { - warn!("Incoming connection error occurred with {local_addr} and {send_back_addr} on connectino {connection_id:?}: {error}"); + warn!("Incoming connection error occurred with {local_addr} and {send_back_addr} on connection {connection_id:?}"); } SwarmEvent::ListenerClosed { listener_id, @@ -308,13 +309,13 @@ impl EventLoop { SwarmEvent::OutgoingConnectionError { connection_id, peer_id, - error, + .. } => match peer_id { Some(id) => { - warn!("Outgoing connection error with peer {id} occurred on connection {connection_id:?}: {error}"); + warn!("Outgoing connection error with peer {id} occurred on connection {connection_id:?}"); } None => { - warn!("Outgoing connection error occurred on connection {connection_id:?}: {error}"); + warn!("Outgoing connection error occurred on connection {connection_id:?}"); } }, @@ -531,7 +532,19 @@ impl EventLoop { } }, - SwarmEvent::Behaviour(BehaviourEvent::Redial(_)) => (), + SwarmEvent::Behaviour(BehaviourEvent::Redial(event)) => match event { + dialer::Event::Dial(peer_id) => { + match self.swarm.dial( + DialOpts::peer_id(peer_id) + .condition(PeerCondition::Disconnected) + .condition(PeerCondition::NotDialing) + .build(), + ) { + Ok(_) => debug!("Dialing peer: {peer_id}"), + Err(_) => warn!("Error dialing peer: {peer_id}"), + }; + } + }, } } } From be309aa7e80339c747233c546d8ad44a3ee1a008 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 13 Jul 2023 08:10:29 +0100 Subject: [PATCH 05/16] Add exponential-backoff to dependencies --- Cargo.lock | 10 ++++++++++ aquadoggo/Cargo.toml | 1 + 2 files changed, 11 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index c8909b81a..67ffd5edd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -103,6 +103,7 @@ dependencies = [ "dynamic-graphql", "env_logger", "envy", + "exponential-backoff", "futures", "hex", "http", @@ -1337,6 +1338,15 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "exponential-backoff" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47f78d87d930eee4b5686a2ab032de499c72bd1e954b84262bb03492a0f932cd" +dependencies = [ + "rand 0.8.5", +] + [[package]] name = "fast_chemail" version = "0.9.6" diff --git a/aquadoggo/Cargo.toml b/aquadoggo/Cargo.toml index fdf045765..f2ec875c5 100644 --- a/aquadoggo/Cargo.toml +++ b/aquadoggo/Cargo.toml @@ -30,6 +30,7 @@ deadqueue = { version = "0.2.3", default-features = false, features = [ directories = "4.0.1" dynamic-graphql = "0.7.3" envy = "0.4.2" +exponential-backoff = "1.2.0" futures = "0.3.23" hex = "0.4.3" http = "0.2.9" From eeeaad20ed9e98dd2f2aa8169629c694dea68452 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 13 Jul 2023 08:10:40 +0100 Subject: [PATCH 06/16] Introduce backoff to dialer --- aquadoggo/src/network/dialer/behaviour.rs | 109 ++++++++++++++-------- 1 file changed, 72 insertions(+), 37 deletions(-) diff --git a/aquadoggo/src/network/dialer/behaviour.rs b/aquadoggo/src/network/dialer/behaviour.rs index c3fea2c2f..bba484402 100644 --- a/aquadoggo/src/network/dialer/behaviour.rs +++ b/aquadoggo/src/network/dialer/behaviour.rs @@ -1,8 +1,9 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; use std::task::{Context, Poll}; +use exponential_backoff::Backoff; use libp2p::core::Endpoint; use libp2p::swarm::derive_prelude::ConnectionEstablished; use libp2p::swarm::dummy::ConnectionHandler as DummyConnectionHandler; @@ -12,13 +13,16 @@ use libp2p::swarm::{ }; use libp2p::{Multiaddr, PeerId}; use log::{debug, warn}; -use void::Void; +use std::time::{Duration, Instant}; -const DIAL_LIMIT: i32 = 10; +const RETRY_LIMIT: u32 = 8; +const BACKOFF_SEC_MIN: u64 = 1; +const BACKOFF_SEC_MAX: u64 = 10; /// Events which are sent to the swarm from the dialer. #[derive(Clone, Debug)] pub enum Event { + /// Event sent to request that the swarm dials a known peer. Dial(PeerId), } @@ -26,65 +30,73 @@ pub enum Event { #[derive(Clone, Debug, Default)] pub struct PeerStatus { /// Number of existing connections to this peer. - connections: i32, + connections: u32, - /// Number of failed dial attempts since the last successful connection. + /// Number of dial attempts to this peer since it was last disconnected. /// - /// Resets back to zero once a connection is established. - dial_attempts: i32, + /// Is reset to 0 once a connection is successfully established. + attempts: u32, + + /// Time at which the peer should be dialed. Is None if this peer has a successfully + /// established connection. + next_dial: Option, } /// Behaviour responsible for dialing peers discovered by the `swarm`. If all connections close to /// a peer, then the dialer attempts to reconnect to the peer. /// -/// @TODO: Need to add back-off to redial attempts. +/// @TODO: Need to add back-off to next_dial attempts. #[derive(Debug)] pub struct Behaviour { - /// The behaviours event queue which gets consumed when `poll` is called. - events: VecDeque>, - - /// Peers we are dialing. + /// Peers we want to dial. peers: HashMap, + + /// The backoff instance used for scheduling next_dials. + backoff: Backoff, } impl Behaviour { pub fn new() -> Self { + let min = Duration::from_secs(BACKOFF_SEC_MIN); + let max = Duration::from_secs(BACKOFF_SEC_MAX); + let backoff = Backoff::new(RETRY_LIMIT, min, max); + Self { - events: VecDeque::new(), peers: HashMap::new(), + backoff, } } - /// Add a peer to the map of known peers which this behaviour should be dialing. + /// Add a peer to the map of known peers who should be dialed. pub fn peer_discovered(&mut self, peer_id: PeerId) { if self.peers.get(&peer_id).is_none() { self.peers.insert(peer_id, PeerStatus::default()); - self.events - .push_back(ToSwarm::GenerateEvent(Event::Dial(peer_id.clone()))); + self.schedule_dial(&peer_id); } } - /// Remove a peer from the behaviours map of known peers. + /// Remove a peer from the map of known peers. /// - /// Called externally if a peer is known to have expired. + /// Called externally if a peers registration expires on the `swarm`. pub fn peer_expired(&mut self, peer_id: PeerId) { - self.remove_peer(&peer_id) + // If the peer doesn't exist this means it was already dropped because the RETRY_LIMIT was + // reached, so don't try and remove it again. + if self.peers.get(&peer_id).is_some() { + self.remove_peer(&peer_id) + } } - /// Issues `Dial` event to the `swarm` for the passed peer. + /// Schedule a peer to be dialed. + /// + /// Uses the configured `backoff` instance for the behaviour and the current `attempts` count + /// for the passed node to set the `next_dial` time for this peer. /// - /// Checks `dial_attempts` for peer first, if this has exceeded the `DIAL_LIMIT` then no - /// `Dial` event is issued. - fn dial_peer(&mut self, peer_id: &PeerId) { + /// If the `REDIAL_LIMIT` is reached then the peer is removed from the map of known peers and + /// no more redial attempts will occur. + fn schedule_dial(&mut self, peer_id: &PeerId) { if let Some(status) = self.peers.get_mut(peer_id) { - status.dial_attempts += 1; - if status.dial_attempts < DIAL_LIMIT { - debug!( - "Attempt redial {} with peer: {peer_id:?}", - status.dial_attempts - ); - self.events - .push_back(ToSwarm::GenerateEvent(Event::Dial(peer_id.clone()))); + if let Some(backoff_delay) = self.backoff.next(status.attempts) { + status.next_dial = Some(Instant::now() + backoff_delay); } else { debug!("Re-dial attempt limit reached: remove peer {peer_id:?}"); self.remove_peer(&peer_id) @@ -95,7 +107,6 @@ impl Behaviour { fn on_connection_established(&mut self, peer_id: PeerId) { match self.peers.get_mut(&peer_id) { Some(status) => { - status.dial_attempts = 0; status.connections += 1; } None => { @@ -110,7 +121,7 @@ impl Behaviour { status.connections -= 1; if status.connections == 0 { - self.dial_peer(peer_id) + self.schedule_dial(peer_id) } } None => { @@ -123,7 +134,7 @@ impl Behaviour { match self.peers.get_mut(peer_id) { Some(status) => { if status.connections == 0 { - self.dial_peer(peer_id) + self.schedule_dial(peer_id) } } None => warn!("Dial failed to unknown peer"), @@ -132,7 +143,8 @@ impl Behaviour { fn remove_peer(&mut self, peer_id: &PeerId) { if self.peers.remove(&peer_id).is_none() { - warn!("Tried to remove unknown peer") + // Don't warn here, peers can be removed twice if their registration on the swarm + // expired at the same time as their RETRY_LIMIT was reached. } } } @@ -204,8 +216,31 @@ impl NetworkBehaviour for Behaviour { _cx: &mut Context<'_>, _params: &mut impl PollParameters, ) -> Poll>> { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(event); + let mut peer_to_dial = None; + let now = Instant::now(); + + // Iterate over all peers and take the first one which has `next_dial` set and the + // scheduled dial time has passed. + for (peer_id, status) in &self.peers { + if let Some(next_dial) = status.next_dial { + if next_dial < now { + peer_to_dial = Some(peer_id.to_owned()); + } + } + } + + if let Some(peer_id) = peer_to_dial { + // Unwrap safely as we know the peer exists. + let status = self.peers.get_mut(&peer_id).unwrap(); + + // Increment the peers dial attempts. + status.attempts += 1; + + // Set the peers `next_dial` value to None. This get's set again if the dial attempt fails. + status.next_dial = None; + + debug!("Dial attempt {} for peer {peer_id}", status.attempts); + return Poll::Ready(ToSwarm::GenerateEvent(Event::Dial(peer_id.to_owned()))); } Poll::Pending From 86e4cb83b776a56a806999066c52ca61b753b19c Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 13 Jul 2023 08:11:07 +0100 Subject: [PATCH 07/16] Use dialer when peers discovered via rendezvous server --- aquadoggo/src/network/service.rs | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index 6741c4e6f..20580eff4 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -370,21 +370,7 @@ impl EventLoop { // Only dial remote peers discovered via rendezvous server if peer_id != local_peer_id { debug!("Discovered peer {peer_id} at {address}"); - - let p2p_suffix = Protocol::P2p(peer_id); - let address_with_p2p = if !address - .ends_with(&Multiaddr::empty().with(p2p_suffix.clone())) - { - address.clone().with(p2p_suffix) - } else { - address.clone() - }; - - debug!("Preparing to dial peer {peer_id} at {address}"); - - if let Err(err) = self.swarm.dial(address_with_p2p) { - warn!("Failed to dial: {}", err); - } + self.swarm.behaviour_mut().redial.peer_discovered(peer_id); } } } @@ -396,7 +382,8 @@ impl EventLoop { trace!("Discovery failed: {error:?}") } rendezvous::client::Event::Expired { peer } => { - trace!("Peer registration with rendezvous expired: {peer:?}") + trace!("Peer registration with rendezvous expired: {peer:?}"); + self.swarm.behaviour_mut().redial.peer_expired(peer); } }, SwarmEvent::Behaviour(BehaviourEvent::RendezvousServer(event)) => match event { From 2e624150e430d235632ed2861fdd17cee20ebf4e Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 13 Jul 2023 08:13:26 +0100 Subject: [PATCH 08/16] Correct field name --- aquadoggo/src/network/behaviour.rs | 8 +++++--- aquadoggo/src/network/service.rs | 10 +++++----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/aquadoggo/src/network/behaviour.rs b/aquadoggo/src/network/behaviour.rs index d70f07646..2e720eb15 100644 --- a/aquadoggo/src/network/behaviour.rs +++ b/aquadoggo/src/network/behaviour.rs @@ -73,7 +73,8 @@ pub struct Behaviour { /// Register peer connections and handle p2panda messaging with them. pub peers: peers::Behaviour, - pub redial: dialer::Behaviour, + /// Dial discovered peers and redial with backoff if all connections are lost. + pub dialer: dialer::Behaviour, } impl Behaviour { @@ -173,7 +174,8 @@ impl Behaviour { // Create behaviour to manage peer connections and handle p2panda messaging let peers = peers::Behaviour::new(); - let redial = dialer::Behaviour::new(); + // Create a behaviour to dial discovered peers. + let dialer = dialer::Behaviour::new(); Ok(Self { autonat: autonat.into(), @@ -186,7 +188,7 @@ impl Behaviour { relay_client: relay_client.into(), relay_server: relay_server.into(), peers, - redial + dialer }) } } diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index 20580eff4..755a9b0fe 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -326,13 +326,13 @@ impl EventLoop { mdns::Event::Discovered(list) => { for (peer_id, multiaddr) in list { debug!("mDNS discovered a new peer: {peer_id}"); - self.swarm.behaviour_mut().redial.peer_discovered(peer_id) + self.swarm.behaviour_mut().dialer.peer_discovered(peer_id) } } mdns::Event::Expired(list) => { for (peer_id, _multiaddr) in list { trace!("mDNS peer has expired: {peer_id}"); - self.swarm.behaviour_mut().redial.peer_expired(peer_id) + self.swarm.behaviour_mut().dialer.peer_expired(peer_id) } } }, @@ -370,7 +370,7 @@ impl EventLoop { // Only dial remote peers discovered via rendezvous server if peer_id != local_peer_id { debug!("Discovered peer {peer_id} at {address}"); - self.swarm.behaviour_mut().redial.peer_discovered(peer_id); + self.swarm.behaviour_mut().dialer.peer_discovered(peer_id); } } } @@ -383,7 +383,7 @@ impl EventLoop { } rendezvous::client::Event::Expired { peer } => { trace!("Peer registration with rendezvous expired: {peer:?}"); - self.swarm.behaviour_mut().redial.peer_expired(peer); + self.swarm.behaviour_mut().dialer.peer_expired(peer); } }, SwarmEvent::Behaviour(BehaviourEvent::RendezvousServer(event)) => match event { @@ -519,7 +519,7 @@ impl EventLoop { } }, - SwarmEvent::Behaviour(BehaviourEvent::Redial(event)) => match event { + SwarmEvent::Behaviour(BehaviourEvent::Dialer(event)) => match event { dialer::Event::Dial(peer_id) => { match self.swarm.dial( DialOpts::peer_id(peer_id) From ea2089a490b6e249d1074dc11455b7e960e0ef77 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 13 Jul 2023 08:16:36 +0100 Subject: [PATCH 09/16] Clippy & fmt --- aquadoggo/src/network/behaviour.rs | 4 ++-- aquadoggo/src/network/dialer/behaviour.rs | 8 +++----- aquadoggo/src/network/mod.rs | 2 +- aquadoggo/src/network/peers/handler.rs | 2 +- aquadoggo/src/network/service.rs | 10 +++++----- aquadoggo/src/replication/service.rs | 2 +- 6 files changed, 13 insertions(+), 15 deletions(-) diff --git a/aquadoggo/src/network/behaviour.rs b/aquadoggo/src/network/behaviour.rs index 2e720eb15..5a0edbc87 100644 --- a/aquadoggo/src/network/behaviour.rs +++ b/aquadoggo/src/network/behaviour.rs @@ -11,7 +11,7 @@ use log::debug; use crate::network::config::NODE_NAMESPACE; use crate::network::NetworkConfiguration; -use crate::network::{peers, dialer}; +use crate::network::{dialer, peers}; /// How often do we broadcast mDNS queries into the network. const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(5); @@ -188,7 +188,7 @@ impl Behaviour { relay_client: relay_client.into(), relay_server: relay_server.into(), peers, - dialer + dialer, }) } } diff --git a/aquadoggo/src/network/dialer/behaviour.rs b/aquadoggo/src/network/dialer/behaviour.rs index bba484402..ff83468b4 100644 --- a/aquadoggo/src/network/dialer/behaviour.rs +++ b/aquadoggo/src/network/dialer/behaviour.rs @@ -99,7 +99,7 @@ impl Behaviour { status.next_dial = Some(Instant::now() + backoff_delay); } else { debug!("Re-dial attempt limit reached: remove peer {peer_id:?}"); - self.remove_peer(&peer_id) + self.remove_peer(peer_id) } } } @@ -142,7 +142,7 @@ impl Behaviour { } fn remove_peer(&mut self, peer_id: &PeerId) { - if self.peers.remove(&peer_id).is_none() { + if self.peers.remove(peer_id).is_none() { // Don't warn here, peers can be removed twice if their registration on the swarm // expired at the same time as their RETRY_LIMIT was reached. } @@ -207,9 +207,7 @@ impl NetworkBehaviour for Behaviour { _id: PeerId, _: ConnectionId, _: THandlerOutEvent, - ) { - (); - } + ) {} fn poll( &mut self, diff --git a/aquadoggo/src/network/mod.rs b/aquadoggo/src/network/mod.rs index 4c54039d2..b6452f522 100644 --- a/aquadoggo/src/network/mod.rs +++ b/aquadoggo/src/network/mod.rs @@ -2,9 +2,9 @@ mod behaviour; mod config; +mod dialer; pub mod identity; mod peers; -mod dialer; mod service; mod shutdown; mod swarm; diff --git a/aquadoggo/src/network/peers/handler.rs b/aquadoggo/src/network/peers/handler.rs index 5e5eb1abe..b45ba9d24 100644 --- a/aquadoggo/src/network/peers/handler.rs +++ b/aquadoggo/src/network/peers/handler.rs @@ -12,7 +12,7 @@ use libp2p::swarm::{ ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream as NegotiatedStream, SubstreamProtocol, }; -use log::{debug, warn}; +use log::warn; use thiserror::Error; use crate::network::peers::{Codec, CodecError, Protocol}; diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index 755a9b0fe..3ab9d45ea 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -18,7 +18,7 @@ use crate::context::Context; use crate::manager::{ServiceReadySender, Shutdown}; use crate::network::behaviour::{Behaviour, BehaviourEvent}; use crate::network::config::NODE_NAMESPACE; -use crate::network::{identity, peers, dialer, swarm, NetworkConfiguration, ShutdownHandler}; +use crate::network::{dialer, identity, peers, swarm, NetworkConfiguration, ShutdownHandler}; /// Network service that configures and deploys a libp2p network swarm over QUIC transports. /// @@ -324,7 +324,7 @@ impl EventLoop { // ~~~~ SwarmEvent::Behaviour(BehaviourEvent::Mdns(event)) => match event { mdns::Event::Discovered(list) => { - for (peer_id, multiaddr) in list { + for (peer_id, _multiaddr) in list { debug!("mDNS discovered a new peer: {peer_id}"); self.swarm.behaviour_mut().dialer.peer_discovered(peer_id) } @@ -523,9 +523,9 @@ impl EventLoop { dialer::Event::Dial(peer_id) => { match self.swarm.dial( DialOpts::peer_id(peer_id) - .condition(PeerCondition::Disconnected) - .condition(PeerCondition::NotDialing) - .build(), + .condition(PeerCondition::Disconnected) + .condition(PeerCondition::NotDialing) + .build(), ) { Ok(_) => debug!("Dialing peer: {peer_id}"), Err(_) => warn!("Error dialing peer: {peer_id}"), diff --git a/aquadoggo/src/replication/service.rs b/aquadoggo/src/replication/service.rs index 35069cf04..63dcb3d22 100644 --- a/aquadoggo/src/replication/service.rs +++ b/aquadoggo/src/replication/service.rs @@ -5,7 +5,7 @@ use std::time::Duration; use anyhow::Result; use libp2p::PeerId; -use log::{debug, info, trace, warn}; +use log::{info, trace, warn}; use p2panda_rs::schema::SchemaId; use p2panda_rs::Human; use tokio::task; From 726edd843e8d42ee0d01e60d0c1c4cb90d1d942c Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 13 Jul 2023 08:18:09 +0100 Subject: [PATCH 10/16] fmt --- aquadoggo/src/network/dialer/behaviour.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aquadoggo/src/network/dialer/behaviour.rs b/aquadoggo/src/network/dialer/behaviour.rs index ff83468b4..802eff3a1 100644 --- a/aquadoggo/src/network/dialer/behaviour.rs +++ b/aquadoggo/src/network/dialer/behaviour.rs @@ -207,7 +207,8 @@ impl NetworkBehaviour for Behaviour { _id: PeerId, _: ConnectionId, _: THandlerOutEvent, - ) {} + ) { + } fn poll( &mut self, From a85c7ca13e6ded977f12bc08a910bb53a87240ac Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 13 Jul 2023 08:19:35 +0100 Subject: [PATCH 11/16] Increase `BACKOFF_SEC_MAX` --- aquadoggo/src/network/dialer/behaviour.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquadoggo/src/network/dialer/behaviour.rs b/aquadoggo/src/network/dialer/behaviour.rs index 802eff3a1..954b2758d 100644 --- a/aquadoggo/src/network/dialer/behaviour.rs +++ b/aquadoggo/src/network/dialer/behaviour.rs @@ -17,7 +17,7 @@ use std::time::{Duration, Instant}; const RETRY_LIMIT: u32 = 8; const BACKOFF_SEC_MIN: u64 = 1; -const BACKOFF_SEC_MAX: u64 = 10; +const BACKOFF_SEC_MAX: u64 = 60; /// Events which are sent to the swarm from the dialer. #[derive(Clone, Debug)] From c03bbcb1909455d96abdbb9e624fcb5dd0e18b20 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 13 Jul 2023 08:34:54 +0100 Subject: [PATCH 12/16] Doc string fixes --- aquadoggo/src/network/dialer/behaviour.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/aquadoggo/src/network/dialer/behaviour.rs b/aquadoggo/src/network/dialer/behaviour.rs index 954b2758d..d448269bb 100644 --- a/aquadoggo/src/network/dialer/behaviour.rs +++ b/aquadoggo/src/network/dialer/behaviour.rs @@ -26,7 +26,7 @@ pub enum Event { Dial(PeerId), } -/// Status of a peer known to the dealer behaviour. +/// Status of a peer known to the dialer behaviour. #[derive(Clone, Debug, Default)] pub struct PeerStatus { /// Number of existing connections to this peer. @@ -43,12 +43,11 @@ pub struct PeerStatus { } /// Behaviour responsible for dialing peers discovered by the `swarm`. If all connections close to -/// a peer, then the dialer attempts to reconnect to the peer. -/// -/// @TODO: Need to add back-off to next_dial attempts. +/// a peer, then the dialer attempts to reconnect to the peer with backoff until `RETRY_LIMIT` is +/// reached. #[derive(Debug)] pub struct Behaviour { - /// Peers we want to dial. + /// Discovered peers we want to dial. peers: HashMap, /// The backoff instance used for scheduling next_dials. From 5d9c42ac581494807d82cd799e505e6e62791374 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 13 Jul 2023 08:35:56 +0100 Subject: [PATCH 13/16] Update CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 37f3fb00f..e5072f7d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Integrate replication manager with networking stack [#387](https://github.com/p2panda/aquadoggo/pull/387) 🥞 - Reverse lookup for pinned relations in dependency task [#434](https://github.com/p2panda/aquadoggo/pull/434) - Persist and maintain index of operation's position in document [#438](https://github.com/p2panda/aquadoggo/pull/438) +- Introduce `dialer` behaviour with retry logic [#444](https://github.com/p2panda/aquadoggo/pull/444) ### Changed From b85b9980413a5622df7d6d44e9f793957a4805f8 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 13 Jul 2023 18:52:01 +0100 Subject: [PATCH 14/16] Re-write `Dialer` behaviour logic --- aquadoggo/src/network/dialer/behaviour.rs | 166 +++++++++++----------- aquadoggo/src/network/service.rs | 19 ++- 2 files changed, 99 insertions(+), 86 deletions(-) diff --git a/aquadoggo/src/network/dialer/behaviour.rs b/aquadoggo/src/network/dialer/behaviour.rs index d448269bb..eed3ed1f8 100644 --- a/aquadoggo/src/network/dialer/behaviour.rs +++ b/aquadoggo/src/network/dialer/behaviour.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::task::{Context, Poll}; use exponential_backoff::Backoff; @@ -8,8 +8,8 @@ use libp2p::core::Endpoint; use libp2p::swarm::derive_prelude::ConnectionEstablished; use libp2p::swarm::dummy::ConnectionHandler as DummyConnectionHandler; use libp2p::swarm::{ - ConnectionClosed, ConnectionDenied, ConnectionId, DialFailure, FromSwarm, NetworkBehaviour, - PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + ConnectionDenied, ConnectionId, DialFailure, FromSwarm, NetworkBehaviour, PollParameters, + THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use libp2p::{Multiaddr, PeerId}; use log::{debug, warn}; @@ -27,11 +27,8 @@ pub enum Event { } /// Status of a peer known to the dialer behaviour. -#[derive(Clone, Debug, Default)] -pub struct PeerStatus { - /// Number of existing connections to this peer. - connections: u32, - +#[derive(Clone, Debug)] +pub struct RetryStatus { /// Number of dial attempts to this peer since it was last disconnected. /// /// Is reset to 0 once a connection is successfully established. @@ -42,13 +39,22 @@ pub struct PeerStatus { next_dial: Option, } -/// Behaviour responsible for dialing peers discovered by the `swarm`. If all connections close to -/// a peer, then the dialer attempts to reconnect to the peer with backoff until `RETRY_LIMIT` is -/// reached. +/// Behaviour responsible for dialing peers discovered by the `swarm` and retrying when dial +/// attempts fail or a connection is unexpectedly closed. +/// +/// Maintains two lists of peers to be dialed: +/// 1) A queue of peers we want to make a new dial attempt to are queued up in `dial`. Peers are +/// placed here when they are first discovered or when an existing connection unexpectedly closes. +/// 2) Peers whose's initial dial failed are held in `retry` and dial attempts are retried with a +/// backoff until `RETRY_LIMIT` is reached. Peers are placed here when a `DialFailed` event is +/// issued from the swarm. #[derive(Debug)] pub struct Behaviour { - /// Discovered peers we want to dial. - peers: HashMap, + /// Queue of known peers we want to make a new dial attempt to. + dial: VecDeque, + + /// Map of peers whose initial dial attempt failed and we want to re-dial. + retry: HashMap, /// The backoff instance used for scheduling next_dials. backoff: Backoff, @@ -61,90 +67,78 @@ impl Behaviour { let backoff = Backoff::new(RETRY_LIMIT, min, max); Self { - peers: HashMap::new(), + dial: VecDeque::new(), + retry: HashMap::new(), backoff, } } - /// Add a peer to the map of known peers who should be dialed. - pub fn peer_discovered(&mut self, peer_id: PeerId) { - if self.peers.get(&peer_id).is_none() { - self.peers.insert(peer_id, PeerStatus::default()); - self.schedule_dial(&peer_id); - } + /// Add a known peer to the dial queue. + pub fn dial_peer(&mut self, peer_id: PeerId) { + self.dial.push_back(peer_id); } - /// Remove a peer from the map of known peers. + /// Inform the behaviour that an unexpected error occurred on a connection to a peer, passing + /// in the number of remaining connections which exist. /// - /// Called externally if a peers registration expires on the `swarm`. - pub fn peer_expired(&mut self, peer_id: PeerId) { - // If the peer doesn't exist this means it was already dropped because the RETRY_LIMIT was - // reached, so don't try and remove it again. - if self.peers.get(&peer_id).is_some() { - self.remove_peer(&peer_id) + /// If there are no remaining connections for this peer we add the peer to the dial queue. + pub fn connection_error(&mut self, peer_id: PeerId, remaining_connections: u32) { + if remaining_connections == 0 { + self.dial_peer(peer_id); } } - /// Schedule a peer to be dialed. + /// Schedule a peer to be re-dialed. /// /// Uses the configured `backoff` instance for the behaviour and the current `attempts` count /// for the passed node to set the `next_dial` time for this peer. /// /// If the `REDIAL_LIMIT` is reached then the peer is removed from the map of known peers and /// no more redial attempts will occur. - fn schedule_dial(&mut self, peer_id: &PeerId) { - if let Some(status) = self.peers.get_mut(peer_id) { + fn schedule_retry(&mut self, peer_id: &PeerId) { + if let Some(status) = self.retry.get_mut(peer_id) { + // If the peer is already in the `retry` map then we check if there is a next backoff + // delay based on the current number of retry attempts. if let Some(backoff_delay) = self.backoff.next(status.attempts) { + // If we haven't reached `RETRY_LIMIT` then we set the `next_dial` time. status.next_dial = Some(Instant::now() + backoff_delay); } else { - debug!("Re-dial attempt limit reached: remove peer {peer_id:?}"); - self.remove_peer(peer_id) - } - } - } - - fn on_connection_established(&mut self, peer_id: PeerId) { - match self.peers.get_mut(&peer_id) { - Some(status) => { - status.connections += 1; - } - None => { - warn!("Connected established to unknown peer"); + // If we have reached `RETRY_LIMIT` then we remove the peer from the `retry` map. + debug!("Re-dial attempt limit reached: {peer_id:?}"); + self.retry.remove(peer_id); } + } else { + // If this peer was not in the `retry` map yet we instantiate a new retry status and + // insert the peer. + let backoff_delay = self + .backoff + .next(1) + .expect("Retry limit should be greater than 1"); + + let status = RetryStatus { + attempts: 1, + next_dial: Some(Instant::now() + backoff_delay), + }; + self.retry.insert(peer_id.to_owned(), status); } } - fn on_connection_closed(&mut self, peer_id: &PeerId) { - match self.peers.get_mut(peer_id) { - Some(status) => { - status.connections -= 1; - - if status.connections == 0 { - self.schedule_dial(peer_id) - } - } - None => { - warn!("Connected closed to unknown peer"); - } - } + /// Inform the behaviour that a connection to a peer was established. + /// + /// When a connection was successfully established we remove the peer from the retry map as we + /// no longer need to re-dial them. + fn on_connection_established(&mut self, peer_id: &PeerId) { + if self.retry.remove(peer_id).is_some() { + debug!("Removed peer from retry queue: {peer_id}"); + }; } + /// Inform the behaviour that a dialing attempt failed. + /// + /// We want the schedule the next dialing attempt or drop the peer completely if we have + /// reached `RETRY_LIMIT`. fn on_dial_failed(&mut self, peer_id: &PeerId) { - match self.peers.get_mut(peer_id) { - Some(status) => { - if status.connections == 0 { - self.schedule_dial(peer_id) - } - } - None => warn!("Dial failed to unknown peer"), - } - } - - fn remove_peer(&mut self, peer_id: &PeerId) { - if self.peers.remove(peer_id).is_none() { - // Don't warn here, peers can be removed twice if their registration on the swarm - // expired at the same time as their RETRY_LIMIT was reached. - } + self.schedule_retry(peer_id) } } @@ -176,19 +170,18 @@ impl NetworkBehaviour for Behaviour { fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => { - self.on_connection_established(peer_id); - } - FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, .. }) => { - self.on_connection_closed(&peer_id); + self.on_connection_established(&peer_id) } FromSwarm::DialFailure(DialFailure { peer_id, .. }) => { if let Some(peer_id) = peer_id { + debug!("Dialing peer failed: {peer_id}"); self.on_dial_failed(&peer_id); } else { warn!("Dial failed to unknown peer") } } - FromSwarm::AddressChange(_) + FromSwarm::ConnectionClosed(_) + | FromSwarm::AddressChange(_) | FromSwarm::ListenFailure(_) | FromSwarm::NewListener(_) | FromSwarm::NewListenAddr(_) @@ -214,22 +207,32 @@ impl NetworkBehaviour for Behaviour { _cx: &mut Context<'_>, _params: &mut impl PollParameters, ) -> Poll>> { - let mut peer_to_dial = None; + // First dial the next peer which exist in the `dial` queue. + if let Some(peer_id) = self.dial.pop_back() { + debug!("Dial: {peer_id}"); + return Poll::Ready(ToSwarm::GenerateEvent(Event::Dial(peer_id.to_owned()))); + } + + // If there were none we move onto peers we want to re-dial. + let mut peer_to_retry = None; let now = Instant::now(); // Iterate over all peers and take the first one which has `next_dial` set and the // scheduled dial time has passed. - for (peer_id, status) in &self.peers { + for (peer_id, status) in &self.retry { if let Some(next_dial) = status.next_dial { if next_dial < now { - peer_to_dial = Some(peer_id.to_owned()); + peer_to_retry = Some(peer_id.to_owned()); + break; } } } - if let Some(peer_id) = peer_to_dial { + if let Some(peer_id) = peer_to_retry { // Unwrap safely as we know the peer exists. - let status = self.peers.get_mut(&peer_id).unwrap(); + let status = self.retry.get_mut(&peer_id).unwrap(); + + debug!("Re-dial attempt {} for peer {peer_id}", status.attempts); // Increment the peers dial attempts. status.attempts += 1; @@ -237,7 +240,6 @@ impl NetworkBehaviour for Behaviour { // Set the peers `next_dial` value to None. This get's set again if the dial attempt fails. status.next_dial = None; - debug!("Dial attempt {} for peer {peer_id}", status.attempts); return Poll::Ready(ToSwarm::GenerateEvent(Event::Dial(peer_id.to_owned()))); } diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index 3ab9d45ea..293a39d84 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -246,6 +246,7 @@ impl EventLoop { peer_id, connection_id, cause, + num_established, .. } => match cause { Some(ConnectionError::IO(error)) => { @@ -254,6 +255,10 @@ impl EventLoop { match error.to_string().as_str() { "timed out" => { debug!("Connection {connection_id:?} timed out with peer {peer_id}"); + self.swarm + .behaviour_mut() + .dialer + .connection_error(peer_id, num_established); } "closed by peer: 0" => { // We received an `ApplicationClose` with code 0 here which means the @@ -262,6 +267,10 @@ impl EventLoop { } _ => { warn!("Connection error occurred with peer {peer_id} on connection {connection_id:?}"); + self.swarm + .behaviour_mut() + .dialer + .connection_error(peer_id, num_established); } } } @@ -270,6 +279,10 @@ impl EventLoop { } Some(ConnectionError::Handler(_)) => { warn!("Connection handler error occurred with peer {peer_id}"); + self.swarm + .behaviour_mut() + .dialer + .connection_error(peer_id, num_established); } None => { debug!("Connection closed with peer {peer_id}"); @@ -326,13 +339,12 @@ impl EventLoop { mdns::Event::Discovered(list) => { for (peer_id, _multiaddr) in list { debug!("mDNS discovered a new peer: {peer_id}"); - self.swarm.behaviour_mut().dialer.peer_discovered(peer_id) + self.swarm.behaviour_mut().dialer.dial_peer(peer_id) } } mdns::Event::Expired(list) => { for (peer_id, _multiaddr) in list { trace!("mDNS peer has expired: {peer_id}"); - self.swarm.behaviour_mut().dialer.peer_expired(peer_id) } } }, @@ -370,7 +382,7 @@ impl EventLoop { // Only dial remote peers discovered via rendezvous server if peer_id != local_peer_id { debug!("Discovered peer {peer_id} at {address}"); - self.swarm.behaviour_mut().dialer.peer_discovered(peer_id); + self.swarm.behaviour_mut().dialer.dial_peer(peer_id); } } } @@ -383,7 +395,6 @@ impl EventLoop { } rendezvous::client::Event::Expired { peer } => { trace!("Peer registration with rendezvous expired: {peer:?}"); - self.swarm.behaviour_mut().dialer.peer_expired(peer); } }, SwarmEvent::Behaviour(BehaviourEvent::RendezvousServer(event)) => match event { From 4d09fd659a6acc13a8d75270c264ef3fc7886f06 Mon Sep 17 00:00:00 2001 From: adz Date: Fri, 14 Jul 2023 10:00:30 +0200 Subject: [PATCH 15/16] Remove unused dependency --- Cargo.lock | 1 - aquadoggo/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 67ffd5edd..05d63b77a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -130,7 +130,6 @@ dependencies = [ "tower-http", "tower-service", "triggered", - "void", ] [[package]] diff --git a/aquadoggo/Cargo.toml b/aquadoggo/Cargo.toml index f2ec875c5..63046fa9b 100644 --- a/aquadoggo/Cargo.toml +++ b/aquadoggo/Cargo.toml @@ -76,7 +76,6 @@ tower-http = { version = "0.3.4", default-features = false, features = [ "cors", ] } triggered = "0.1.2" -void = "1.0.2" [dev-dependencies] ciborium = "0.2.0" From c62e57049842cb9a78773a56827621d104d5d510 Mon Sep 17 00:00:00 2001 From: adz Date: Fri, 14 Jul 2023 10:01:46 +0200 Subject: [PATCH 16/16] Correct import grouping --- aquadoggo/src/network/dialer/behaviour.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquadoggo/src/network/dialer/behaviour.rs b/aquadoggo/src/network/dialer/behaviour.rs index eed3ed1f8..326bb72a7 100644 --- a/aquadoggo/src/network/dialer/behaviour.rs +++ b/aquadoggo/src/network/dialer/behaviour.rs @@ -2,6 +2,7 @@ use std::collections::{HashMap, VecDeque}; use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; use exponential_backoff::Backoff; use libp2p::core::Endpoint; @@ -13,7 +14,6 @@ use libp2p::swarm::{ }; use libp2p::{Multiaddr, PeerId}; use log::{debug, warn}; -use std::time::{Duration, Instant}; const RETRY_LIMIT: u32 = 8; const BACKOFF_SEC_MIN: u64 = 1;