Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement dialer behaviour #444

Merged
merged 16 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Integrate replication manager with networking stack [#387](https://github.com/p2panda/aquadoggo/pull/387) 🥞
- Reverse lookup for pinned relations in dependency task [#434](https://github.com/p2panda/aquadoggo/pull/434)
- Persist and maintain index of operation's position in document [#438](https://github.com/p2panda/aquadoggo/pull/438)
- Introduce `dialer` behaviour with retry logic [#444](https://github.com/p2panda/aquadoggo/pull/444)

### Changed

Expand Down
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions aquadoggo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ deadqueue = { version = "0.2.3", default-features = false, features = [
directories = "4.0.1"
dynamic-graphql = "0.7.3"
envy = "0.4.2"
exponential-backoff = "1.2.0"
futures = "0.3.23"
hex = "0.4.3"
http = "0.2.9"
Expand Down Expand Up @@ -75,6 +76,7 @@ tower-http = { version = "0.3.4", default-features = false, features = [
"cors",
] }
triggered = "0.1.2"
void = "1.0.2"
adzialocha marked this conversation as resolved.
Show resolved Hide resolved

[dev-dependencies]
ciborium = "0.2.0"
Expand Down
9 changes: 8 additions & 1 deletion aquadoggo/src/network/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use libp2p::{autonat, connection_limits, identify, mdns, ping, relay, rendezvous
use log::debug;

use crate::network::config::NODE_NAMESPACE;
use crate::network::peers;
use crate::network::NetworkConfiguration;
use crate::network::{dialer, peers};

/// How often do we broadcast mDNS queries into the network.
const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -72,6 +72,9 @@ pub struct Behaviour {

/// Register peer connections and handle p2panda messaging with them.
pub peers: peers::Behaviour,

/// Dial discovered peers and redial with backoff if all connections are lost.
pub dialer: dialer::Behaviour,
}

impl Behaviour {
Expand Down Expand Up @@ -171,6 +174,9 @@ impl Behaviour {
// Create behaviour to manage peer connections and handle p2panda messaging
let peers = peers::Behaviour::new();

// Create a behaviour to dial discovered peers.
let dialer = dialer::Behaviour::new();

Ok(Self {
autonat: autonat.into(),
identify: identify.into(),
Expand All @@ -182,6 +188,7 @@ impl Behaviour {
relay_client: relay_client.into(),
relay_server: relay_server.into(),
peers,
dialer,
})
}
}
246 changes: 246 additions & 0 deletions aquadoggo/src/network/dialer/behaviour.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use std::collections::HashMap;
use std::task::{Context, Poll};

use exponential_backoff::Backoff;
use libp2p::core::Endpoint;
use libp2p::swarm::derive_prelude::ConnectionEstablished;
use libp2p::swarm::dummy::ConnectionHandler as DummyConnectionHandler;
use libp2p::swarm::{
ConnectionClosed, ConnectionDenied, ConnectionId, DialFailure, FromSwarm, NetworkBehaviour,
PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use libp2p::{Multiaddr, PeerId};
use log::{debug, warn};
use std::time::{Duration, Instant};
adzialocha marked this conversation as resolved.
Show resolved Hide resolved

const RETRY_LIMIT: u32 = 8;
const BACKOFF_SEC_MIN: u64 = 1;
const BACKOFF_SEC_MAX: u64 = 60;

/// Events which are sent to the swarm from the dialer.
#[derive(Clone, Debug)]
pub enum Event {
/// Event sent to request that the swarm dials a known peer.
Dial(PeerId),
}

/// Status of a peer known to the dialer behaviour.
#[derive(Clone, Debug, Default)]
pub struct PeerStatus {
/// Number of existing connections to this peer.
connections: u32,

/// Number of dial attempts to this peer since it was last disconnected.
///
/// Is reset to 0 once a connection is successfully established.
attempts: u32,

/// Time at which the peer should be dialed. Is None if this peer has a successfully
/// established connection.
next_dial: Option<Instant>,
}

/// Behaviour responsible for dialing peers discovered by the `swarm`. If all connections close to
/// a peer, then the dialer attempts to reconnect to the peer with backoff until `RETRY_LIMIT` is
/// reached.
#[derive(Debug)]
pub struct Behaviour {
/// Discovered peers we want to dial.
peers: HashMap<PeerId, PeerStatus>,

/// The backoff instance used for scheduling next_dials.
backoff: Backoff,
}

impl Behaviour {
pub fn new() -> Self {
let min = Duration::from_secs(BACKOFF_SEC_MIN);
let max = Duration::from_secs(BACKOFF_SEC_MAX);
let backoff = Backoff::new(RETRY_LIMIT, min, max);

Self {
peers: HashMap::new(),
backoff,
}
}

/// Add a peer to the map of known peers who should be dialed.
pub fn peer_discovered(&mut self, peer_id: PeerId) {
if self.peers.get(&peer_id).is_none() {
self.peers.insert(peer_id, PeerStatus::default());
self.schedule_dial(&peer_id);

Check warning on line 73 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L70-L73

Added lines #L70 - L73 were not covered by tests
}
}

/// Remove a peer from the map of known peers.
///
/// Called externally if a peers registration expires on the `swarm`.
pub fn peer_expired(&mut self, peer_id: PeerId) {

Check warning on line 80 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L80

Added line #L80 was not covered by tests
// If the peer doesn't exist this means it was already dropped because the RETRY_LIMIT was
// reached, so don't try and remove it again.
if self.peers.get(&peer_id).is_some() {
self.remove_peer(&peer_id)

Check warning on line 84 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L83-L84

Added lines #L83 - L84 were not covered by tests
}
}

/// Schedule a peer to be dialed.
///
/// Uses the configured `backoff` instance for the behaviour and the current `attempts` count
/// for the passed node to set the `next_dial` time for this peer.
///
/// If the `REDIAL_LIMIT` is reached then the peer is removed from the map of known peers and
/// no more redial attempts will occur.
fn schedule_dial(&mut self, peer_id: &PeerId) {
if let Some(status) = self.peers.get_mut(peer_id) {
if let Some(backoff_delay) = self.backoff.next(status.attempts) {
status.next_dial = Some(Instant::now() + backoff_delay);

Check warning on line 98 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L95-L98

Added lines #L95 - L98 were not covered by tests
} else {
debug!("Re-dial attempt limit reached: remove peer {peer_id:?}");
self.remove_peer(peer_id)

Check warning on line 101 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L100-L101

Added lines #L100 - L101 were not covered by tests
}
}
}

fn on_connection_established(&mut self, peer_id: PeerId) {
match self.peers.get_mut(&peer_id) {
Some(status) => {
status.connections += 1;

Check warning on line 109 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L106-L109

Added lines #L106 - L109 were not covered by tests
}
None => {
warn!("Connected established to unknown peer");

Check warning on line 112 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L112

Added line #L112 was not covered by tests
}
}
}

fn on_connection_closed(&mut self, peer_id: &PeerId) {
match self.peers.get_mut(peer_id) {
Some(status) => {
status.connections -= 1;

Check warning on line 120 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L117-L120

Added lines #L117 - L120 were not covered by tests

if status.connections == 0 {
self.schedule_dial(peer_id)

Check warning on line 123 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L122-L123

Added lines #L122 - L123 were not covered by tests
}
}
None => {
warn!("Connected closed to unknown peer");

Check warning on line 127 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L127

Added line #L127 was not covered by tests
}
}
}

fn on_dial_failed(&mut self, peer_id: &PeerId) {
match self.peers.get_mut(peer_id) {
Some(status) => {
if status.connections == 0 {
self.schedule_dial(peer_id)

Check warning on line 136 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L132-L136

Added lines #L132 - L136 were not covered by tests
}
}
None => warn!("Dial failed to unknown peer"),

Check warning on line 139 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L139

Added line #L139 was not covered by tests
}
}

fn remove_peer(&mut self, peer_id: &PeerId) {
if self.peers.remove(peer_id).is_none() {

Check warning on line 144 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L143-L144

Added lines #L143 - L144 were not covered by tests
// Don't warn here, peers can be removed twice if their registration on the swarm
// expired at the same time as their RETRY_LIMIT was reached.
}
}
}

impl NetworkBehaviour for Behaviour {
type ConnectionHandler = DummyConnectionHandler;

type ToSwarm = Event;

fn handle_established_inbound_connection(

Check warning on line 156 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L156

Added line #L156 was not covered by tests
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(DummyConnectionHandler)

Check warning on line 163 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L163

Added line #L163 was not covered by tests
}

fn handle_established_outbound_connection(

Check warning on line 166 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L166

Added line #L166 was not covered by tests
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(DummyConnectionHandler)

Check warning on line 173 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L173

Added line #L173 was not covered by tests
}

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => {
self.on_connection_established(peer_id);

Check warning on line 179 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L178-L179

Added lines #L178 - L179 were not covered by tests
}
FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, .. }) => {
self.on_connection_closed(&peer_id);

Check warning on line 182 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L181-L182

Added lines #L181 - L182 were not covered by tests
sandreae marked this conversation as resolved.
Show resolved Hide resolved
}
FromSwarm::DialFailure(DialFailure { peer_id, .. }) => {
if let Some(peer_id) = peer_id {
self.on_dial_failed(&peer_id);

Check warning on line 186 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L184-L186

Added lines #L184 - L186 were not covered by tests
} else {
warn!("Dial failed to unknown peer")

Check warning on line 188 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L188

Added line #L188 was not covered by tests
}
}
FromSwarm::AddressChange(_)
| FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_)
| FromSwarm::NewListenAddr(_)
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)
| FromSwarm::NewExternalAddrCandidate(_)
| FromSwarm::ExternalAddrConfirmed(_)
| FromSwarm::ExternalAddrExpired(_) => {}
}
}

fn on_connection_handler_event(

Check warning on line 204 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L204

Added line #L204 was not covered by tests
&mut self,
_id: PeerId,
_: ConnectionId,
_: THandlerOutEvent<Self>,
) {
}

fn poll(
&mut self,
_cx: &mut Context<'_>,
_params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
let mut peer_to_dial = None;
let now = Instant::now();

// Iterate over all peers and take the first one which has `next_dial` set and the
// scheduled dial time has passed.
for (peer_id, status) in &self.peers {
if let Some(next_dial) = status.next_dial {
if next_dial < now {
peer_to_dial = Some(peer_id.to_owned());

Check warning on line 225 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L223-L225

Added lines #L223 - L225 were not covered by tests
}
}
}

if let Some(peer_id) = peer_to_dial {
// Unwrap safely as we know the peer exists.
let status = self.peers.get_mut(&peer_id).unwrap();

Check warning on line 232 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L232

Added line #L232 was not covered by tests

// Increment the peers dial attempts.
status.attempts += 1;

Check warning on line 235 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L235

Added line #L235 was not covered by tests

// Set the peers `next_dial` value to None. This get's set again if the dial attempt fails.
status.next_dial = None;

Check warning on line 238 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L238

Added line #L238 was not covered by tests

debug!("Dial attempt {} for peer {peer_id}", status.attempts);
return Poll::Ready(ToSwarm::GenerateEvent(Event::Dial(peer_id.to_owned())));

Check warning on line 241 in aquadoggo/src/network/dialer/behaviour.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/network/dialer/behaviour.rs#L240-L241

Added lines #L240 - L241 were not covered by tests
}

Poll::Pending
}
}
5 changes: 5 additions & 0 deletions aquadoggo/src/network/dialer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

mod behaviour;

pub use behaviour::{Behaviour, Event};
1 change: 1 addition & 0 deletions aquadoggo/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

mod behaviour;
mod config;
mod dialer;
pub mod identity;
mod peers;
mod service;
Expand Down
14 changes: 4 additions & 10 deletions aquadoggo/src/network/peers/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use libp2p::swarm::{
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream as NegotiatedStream,
SubstreamProtocol,
};
use log::{debug, warn};
use log::warn;
use thiserror::Error;

use crate::network::peers::{Codec, CodecError, Protocol};
Expand Down Expand Up @@ -221,15 +221,9 @@ impl ConnectionHandler for Handler {
}
ConnectionEvent::DialUpgradeError(_)
| ConnectionEvent::AddressChange(_)
| ConnectionEvent::ListenUpgradeError(_) => {
warn!("Connection event error");
}
ConnectionEvent::LocalProtocolsChange(_) => {
debug!("ConnectionEvent: LocalProtocolsChange")
}
ConnectionEvent::RemoteProtocolsChange(_) => {
debug!("ConnectionEvent: RemoteProtocolsChange")
}
| ConnectionEvent::ListenUpgradeError(_)
| ConnectionEvent::LocalProtocolsChange(_)
| ConnectionEvent::RemoteProtocolsChange(_) => (),
}
}

Expand Down
Loading
Loading