Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dead peer detection #264

Merged
merged 1 commit into from May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions safenode/Cargo.toml
Expand Up @@ -45,6 +45,7 @@ itertools = "~0.10.1"
lazy_static = "~1.4.0"
libp2p = { version="0.51", features = ["tokio", "dns", "kad", "macros", "request-response", "identify"] }
libp2p-quic = { version = "0.7.0-alpha.3", features = ["tokio"] }
lru_time_cache = "0.11.11"
opentelemetry = { version = "0.17", features = ["rt-tokio"], optional = true }
opentelemetry-otlp = { version = "0.10", optional = true }
opentelemetry-semantic-conventions = { version = "0.9.0", optional = true }
Expand Down
13 changes: 13 additions & 0 deletions safenode/src/network/event.rs
Expand Up @@ -282,10 +282,23 @@ impl SwarmDriver {
info!("Connection closed to Peer {peer_id} - {endpoint:?} - {cause:?}");
}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
info!("Having OutgoingConnectionError {peer_id:?} - {error:?}");
maqi marked this conversation as resolved.
Show resolved Hide resolved
if let Some(peer_id) = peer_id {
if let Some(sender) = self.pending_dial.remove(&peer_id) {
info!("OutgoingConnectionError is due to a pending_dial to {peer_id}");
maqi marked this conversation as resolved.
Show resolved Hide resolved
let _ = sender.send(Err(error.into()));
}
// A dead peer will cause a bunch of `OutgoingConnectionError`s
// to be received within a short period.
if let Some(value) = self.potential_dead_peers.get_mut(&peer_id) {
*value += 1;
maqi marked this conversation as resolved.
Show resolved Hide resolved
if *value > 3 {
trace!("Detected dead peer {peer_id:?}");
let _ = self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
}
} else {
let _ = self.potential_dead_peers.insert(peer_id, 1);
}
}
}
SwarmEvent::IncomingConnectionError { .. } => {}
Expand Down
21 changes: 21 additions & 0 deletions safenode/src/network/mod.rs
Expand Up @@ -44,6 +44,7 @@ use libp2p::{
swarm::{Swarm, SwarmBuilder},
Multiaddr, PeerId, Transport,
};
use lru_time_cache::LruCache;
use std::{
collections::{HashMap, HashSet},
iter,
Expand Down Expand Up @@ -93,6 +94,22 @@ pub struct SwarmDriver {
pending_get_closest_peers: PendingGetClosest,
pending_requests: HashMap<RequestId, oneshot::Sender<Result<Response>>>,
pending_query: HashMap<QueryId, oneshot::Sender<Result<QueryResponse>>>,
// Kademlia uses a technique called `lazy refreshing` to periodically check
// the responsiveness of nodes in its routing table, and attempts to
// replace it with a new node from its list of known nodes.
// However the incommunicable node will prolong the get_closest process a lot.
// Although the incommunicable node will be replaced by a new entry, it has a flaw that:
// the dropout peer in close-range will be replaced by a far-range replaced in peer,
// which the latter may not trigger a replication.
// That is because the replication range is defined by CLOSE_GROUP_SIZE (8)
// meanwhile replace range is defined by K-VALUE (20).
// If leave the replication only triggered by newly added peer,
// this leaves a risk that data copies may not get replicated out in time.
// Hence, a connection based dead peer detection gives at least following benefits:
// 1, make get_closest_peers faster with incommunicable node.
// Even with larger network, it still gain something.
// 2, it ensures a corrected partially targeted replication .
potential_dead_peers: LruCache<PeerId, usize>,
}

impl SwarmDriver {
Expand Down Expand Up @@ -268,6 +285,10 @@ impl SwarmDriver {
pending_get_closest_peers: Default::default(),
pending_requests: Default::default(),
pending_query: Default::default(),
potential_dead_peers: LruCache::with_expiry_duration_and_capacity(
Duration::from_secs(10),
maqi marked this conversation as resolved.
Show resolved Hide resolved
50,
),
};

Ok((
Expand Down