Skip to content

Commit

Permalink
fix: use new strategy to evict inbound peer
Browse files Browse the repository at this point in the history
  • Loading branch information
jjyr committed Dec 19, 2018
1 parent df414b0 commit 95451e7
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 64 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ tokio = "0.1.8"
futures = { version = "0.1.19", features = ["use_std"] }
snap = "0.2"
libp2p = { git = "https://github.com/libp2p/rust-libp2p", rev="cfdfca1a06fb2deb9ebcc15a63d715ebddb23bd0", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
ckb-time = { path = "../util/time" }
2 changes: 2 additions & 0 deletions network/src/ckb_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::protocol_service::ProtocolService;
use crate::CKBProtocolHandler;
use crate::Network;
use crate::PeerId;
use ckb_time::now_ms;
use futures::future::{self, Future};
use futures::Stream;
use libp2p::core::{Endpoint, Multiaddr, UniqueConnecState};
Expand Down Expand Up @@ -81,6 +82,7 @@ impl CKBService {
move |data| {
// update kad_system when we received data
kad_system.update_kbuckets(peer_id.clone());
network.modify_peer(&peer_id, |peer| peer.last_message_time = Some(now_ms()));
let protocol_handler = Arc::clone(&protocol_handler);
let network = Arc::clone(&network);
let handle_received = future::lazy(move || {
Expand Down
17 changes: 15 additions & 2 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use std::boxed::Box;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use std::usize;
use tokio::io::{AsyncRead, AsyncWrite};

Expand All @@ -51,7 +50,7 @@ pub struct SessionInfo {
pub struct PeerInfo {
pub peer_id: PeerId,
pub endpoint_role: Endpoint,
pub last_ping_time: Option<Instant>,
pub last_ping_time: Option<u64>,
pub connected_addr: Multiaddr,
pub identify_info: Option<PeerIdentifyInfo>,
}
Expand Down Expand Up @@ -106,6 +105,20 @@ impl Network {
peers_registry.connection_status()
}

pub(crate) fn modify_peer<F>(&self, peer_id: &PeerId, mut f: F) -> bool
where
F: FnMut(&mut PeerConnection) -> (),
{
let mut peers_registry = self.peers_registry.write();
match peers_registry.get_mut(peer_id) {
Some(peer) => {
f(peer);
true
}
None => false,
}
}

pub(crate) fn get_peer_identify_info(&self, peer_id: &PeerId) -> Option<PeerIdentifyInfo> {
let peers_registry = self.peers_registry.read();
peers_registry
Expand Down
24 changes: 16 additions & 8 deletions network/src/network_group.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use libp2p::core::{AddrComponent, Multiaddr};
use std::net::IpAddr;

pub type Group = Vec<u8>;
#[derive(Hash, Eq, PartialEq, Debug)]
pub enum Group {
NoGroup,
LocalNetwork,
IP4([u8; 2]),
IP6([u8; 4]),
}

pub trait NetworkGroup {
fn network_group(&self) -> Group;
Expand All @@ -22,28 +28,30 @@ impl NetworkGroup for Multiaddr {
fn network_group(&self) -> Group {
if let Some(ip_addr) = extract_ip_addr(self) {
if ip_addr.is_loopback() {
// Local NetworkGroup
return vec![1];
return Group::LocalNetwork;
}
// TODO uncomment after ip feature stable
// if !ip_addr.is_global() {
// // Global NetworkGroup
// return vec![2]
// return Group::GlobalNetwork
// }

// IPv4 NetworkGroup
if let IpAddr::V4(ipv4) = ip_addr {
return ipv4.octets()[0..2].to_vec();
let bits = ipv4.octets();
return Group::IP4([bits[0], bits[1]]);
}
// IPv6 NetworkGroup
if let IpAddr::V6(ipv6) = ip_addr {
if let Some(ipv4) = ipv6.to_ipv4() {
return ipv4.octets()[0..2].to_vec();
let bits = ipv4.octets();
return Group::IP4([bits[0], bits[1]]);
}
return ipv6.octets()[0..4].to_vec();
let bits = ipv6.octets();
return Group::IP6([bits[0], bits[1], bits[2], bits[3]]);
}
}
// Can't group addr
vec![0]
Group::NoGroup
}
}
142 changes: 97 additions & 45 deletions network/src/peers_registry.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::network_group::{Group, NetworkGroup};
use crate::peer_store::{PeerStore, Score};
use crate::peer_store::PeerStore;
use crate::{Error, ErrorKind, PeerId, PeerIndex, ProtocolId};
use bytes::Bytes;
use ckb_time::now_ms;
use ckb_util::RwLock;
use fnv::{FnvHashMap, FnvHashSet};
use futures::sync::mpsc::UnboundedSender;
Expand All @@ -12,7 +13,8 @@ use rand::seq::SliceRandom;
use rand::thread_rng;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Instant;

pub(crate) const EVICTION_PROTECT_PEERS: usize = 8;

struct PeerConnections {
id_allocator: AtomicUsize,
Expand Down Expand Up @@ -97,7 +99,10 @@ pub struct PeerConnection {
pub(crate) pinger_loader: UniqueConnec<ping::Pinger>,
pub identify_info: Option<PeerIdentifyInfo>,
pub(crate) ckb_protocols: Vec<ProtocolConnec>,
pub last_ping_time: Option<Instant>,
pub last_ping_time: Option<u64>,
pub last_message_time: Option<u64>,
pub ping: Option<u64>,
pub connected_time: Option<u64>,
}

impl PeerConnection {
Expand All @@ -108,7 +113,10 @@ impl PeerConnection {
pinger_loader: UniqueConnec::empty(),
identify_info: None,
ckb_protocols: Vec::with_capacity(1),
ping: None,
last_ping_time: None,
last_message_time: None,
connected_time: None,
peer_index: None,
}
}
Expand Down Expand Up @@ -172,6 +180,14 @@ fn find_most_peers_in_same_network_group<'a>(
.unwrap_or_else(Vec::new)
}

fn sort_then_drop_last_n_elements<T, F>(list: &mut Vec<T>, n: usize, compare: F)
where
F: FnMut(&T, &T) -> std::cmp::Ordering,
{
list.sort_by(compare);
list.truncate(list.len().saturating_sub(n));
}

impl PeersRegistry {
pub fn new(
peer_store: Arc<RwLock<Box<PeerStore>>>,
Expand Down Expand Up @@ -217,19 +233,16 @@ impl PeersRegistry {
))
.into());
}
let candidate_score = {
let peer_store = self.peer_store.read();
if peer_store.is_banned(&peer_id) {
return Err(
ErrorKind::InvalidNewPeer(format!("peer {:?} is denied", peer_id)).into(),
);
}
peer_store.peer_score_or_default(&peer_id)
};
if self.peer_store.read().is_banned(&peer_id) {
return Err(
ErrorKind::InvalidNewPeer(format!("peer {:?} is denied", peer_id)).into(),
);
}

let connection_status = self.connection_status();
// check peers connection limitation
if connection_status.unreserved_inbound >= self.max_inbound
&& !self.try_evict_inbound_peer(candidate_score)
&& !self.try_evict_inbound_peer()
{
return Err(ErrorKind::InvalidNewPeer(format!(
"reach max inbound peers limitation, reject peer {:?}",
Expand All @@ -242,42 +255,80 @@ impl PeersRegistry {
Ok(())
}

fn try_evict_inbound_peer(&mut self, candidate_score: Score) -> bool {
let peer_id = {
let inbound_peers = self
fn try_evict_inbound_peer(&mut self) -> bool {
let peer_id: PeerId = {
let mut candidate_peers = self
.peers
.iter()
.filter(|(peer_id, peer)| peer.is_inbound() && !self.is_reserved(peer_id));
let candidate_peers = find_most_peers_in_same_network_group(inbound_peers);
.filter(|(peer_id, peer)| peer.is_inbound() && !self.is_reserved(peer_id))
.collect::<Vec<_>>();
let peer_store = self.peer_store.read();

// must have less score than candidate_score
let mut lowest_score = candidate_score - 1;
let mut low_score_peers = Vec::new();
for peer_id in candidate_peers {
if let Some(score) = peer_store.peer_score(peer_id) {
if score > lowest_score {
continue;
}
if score < lowest_score {
lowest_score = score;
low_score_peers.clear();
}
low_score_peers.push(peer_id);
}
}
// failed to evict
if low_score_peers.is_empty() {
return false;
}
// Protect peers based on characteristics that an attacker hard to simulate or manipulate
// Protect peers which has the highest score
sort_then_drop_last_n_elements(
&mut candidate_peers,
EVICTION_PROTECT_PEERS,
|(peer_id1, _), (peer_id2, _)| {
let peer1_score = peer_store.peer_score(peer_id1).unwrap_or_default();
let peer2_score = peer_store.peer_score(peer_id2).unwrap_or_default();
peer1_score.cmp(&peer2_score)
},
);

// Protect peers which has the lowest ping
sort_then_drop_last_n_elements(
&mut candidate_peers,
EVICTION_PROTECT_PEERS,
|(_, peer1), (_, peer2)| {
let peer1_ping = peer1.ping.unwrap_or_else(|| std::u64::MAX);
let peer2_ping = peer2.ping.unwrap_or_else(|| std::u64::MAX);
peer2_ping.cmp(&peer1_ping)
},
);

// Protect peers which most recently sent messages
sort_then_drop_last_n_elements(
&mut candidate_peers,
EVICTION_PROTECT_PEERS,
|(_, peer1), (_, peer2)| {
let peer1_last_message_time = peer1.last_message_time.unwrap_or_default();
let peer2_last_message_time = peer2.last_message_time.unwrap_or_default();
peer1_last_message_time.cmp(&peer2_last_message_time)
},
);
candidate_peers.sort_by(|(_, peer1), (_, peer2)| {
let peer1_last_connected_at = peer1.connected_time.unwrap_or_else(|| std::u64::MAX);
let peer2_last_connected_at = peer2.connected_time.unwrap_or_else(|| std::u64::MAX);
peer2_last_connected_at.cmp(&peer1_last_connected_at)
});
// Protect half peers which have the longest connection time
let protect_peers = candidate_peers.len() / 2;
sort_then_drop_last_n_elements(
&mut candidate_peers,
protect_peers,
|(_, peer1), (_, peer2)| {
let peer1_last_connected_at =
peer1.connected_time.unwrap_or_else(|| std::u64::MAX);
let peer2_last_connected_at =
peer2.connected_time.unwrap_or_else(|| std::u64::MAX);
peer2_last_connected_at.cmp(&peer1_last_connected_at)
},
);

let mut evict_group =
find_most_peers_in_same_network_group(candidate_peers.into_iter());
let mut rng = thread_rng();
low_score_peers[..]
.choose(&mut rng)
.unwrap()
.to_owned()
.to_owned()
evict_group.shuffle(&mut rng);
// randomly evict a lowest scored peer
match evict_group
.iter()
.min_by_key(|peer_id| peer_store.peer_score(peer_id).unwrap_or_default())
{
Some(peer_id) => peer_id.to_owned().to_owned(),
None => return false,
}
};
debug!("evict inbound peer {:?}", peer_id);
debug!(target: "network", "evict inbound peer {:?}", peer_id);
self.drop_peer(&peer_id);
true
}
Expand Down Expand Up @@ -325,7 +376,8 @@ impl PeersRegistry {
self.peer_store
.write()
.new_connected_peer(&peer_id, connected_addr.clone());
let peer = PeerConnection::new(connected_addr, endpoint);
let mut peer = PeerConnection::new(connected_addr, endpoint);
peer.connected_time = Some(now_ms());
let peer_index = self.peers.or_insert(peer_id.clone(), peer);
debug!(target: "network", "allocate peer_index {} to peer {:?}", peer_index, peer_id);
peer_index
Expand Down
12 changes: 9 additions & 3 deletions network/src/ping_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::protocol_service::ProtocolService;
use crate::transport::TransportOutput;
use crate::Network;
use crate::PeerId;
use ckb_time::now_ms;
use futures::future::{self, Future};
use futures::stream::FuturesUnordered;
use futures::Stream;
Expand Down Expand Up @@ -140,21 +141,26 @@ impl<T: Send> ProtocolService<T> for PingService {
})
}
});
let ping_start_time = Instant::now();
let ping_start_time = now_ms();
let ping_future =
Future::then(Timeout::new(ping_future, ping_timeout), {
let network = Arc::clone(&network);
move |result| -> Result<(), IoError> {
let mut peer_store = network.peer_store().write();
match result {
Ok(peer_id) => {
let received_during = ping_start_time.elapsed();
let now = now_ms();
let ping = now - ping_start_time;
network.modify_peer(&peer_id, |peer| {
peer.ping = Some(ping);
peer.last_ping_time = Some(now);
});
peer_store.report(&peer_id, Behaviour::Ping);
trace!(
target: "network",
"received pong from {:?} in {:?}",
peer_id,
received_during
ping
);
Ok(())
}
Expand Down

0 comments on commit 95451e7

Please sign in to comment.