From fc7defef14fdb573846b4f154f50b7c7e0eb0a4f Mon Sep 17 00:00:00 2001 From: zhangsoledad <787953403@qq.com> Date: Thu, 4 Apr 2019 20:33:33 +0800 Subject: [PATCH 1/3] refactor: avoid recursive lock --- network/src/benches/sqlite_peer_store.rs | 31 +-- network/src/network.rs | 136 ++++------ network/src/peer_store.rs | 12 +- network/src/peer_store/sqlite/peer_store.rs | 56 ++-- network/src/peers_registry.rs | 284 ++++++++------------ network/src/protocols/discovery.rs | 2 - network/src/protocols/identify.rs | 3 +- network/src/protocols/mod.rs | 4 +- network/src/protocols/outbound_peer.rs | 12 +- network/src/tests/peers_registry.rs | 173 +++++++++--- network/src/tests/sqlite_peer_store.rs | 22 +- 11 files changed, 369 insertions(+), 366 deletions(-) diff --git a/network/src/benches/sqlite_peer_store.rs b/network/src/benches/sqlite_peer_store.rs index f9fd11c049..c2917dc99f 100644 --- a/network/src/benches/sqlite_peer_store.rs +++ b/network/src/benches/sqlite_peer_store.rs @@ -8,16 +8,14 @@ use ckb_network::{ peer_store::{PeerStore, SqlitePeerStore}, PeerId, SessionType, }; -use ckb_util::Mutex; use criterion::Criterion; use std::rc::Rc; fn insert_peer_info_benchmark(c: &mut Criterion) { c.bench_function("insert 100 peer_info", |b| { b.iter({ - let mut peer_store = - SqlitePeerStore::memory("bench_db_insert_100_peer_info".to_string()) - .expect("memory"); + let peer_store = SqlitePeerStore::memory("bench_db_insert_100_peer_info".to_string()) + .expect("memory"); let peer_ids = (0..100).map(|_| PeerId::random()).collect::>(); let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap(); move || { @@ -29,9 +27,8 @@ fn insert_peer_info_benchmark(c: &mut Criterion) { }); c.bench_function("insert 1000 peer_info", |b| { b.iter({ - let mut peer_store = - SqlitePeerStore::memory("bench_db_insert_1000_peer_info".to_string()) - .expect("memory"); + let peer_store = SqlitePeerStore::memory("bench_db_insert_1000_peer_info".to_string()) + .expect("memory"); let peer_ids = (0..1000).map(|_| PeerId::random()).collect::>(); let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap(); move || { @@ -45,7 +42,7 @@ fn insert_peer_info_benchmark(c: &mut Criterion) { // filesystem benchmark c.bench_function("insert 100 peer_info on filesystem", move |b| { b.iter({ - let mut peer_store = SqlitePeerStore::temp().expect("temp"); + let peer_store = SqlitePeerStore::temp().expect("temp"); let peer_ids = (0..100).map(|_| PeerId::random()).collect::>(); let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap(); move || { @@ -59,12 +56,10 @@ fn insert_peer_info_benchmark(c: &mut Criterion) { fn random_order_benchmark(c: &mut Criterion) { { - let peer_store = Rc::new(Mutex::new( - SqlitePeerStore::memory("bench_db_random_order".to_string()).expect("memory"), - )); + let peer_store = + Rc::new(SqlitePeerStore::memory("bench_db_random_order".to_string()).expect("memory")); let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap(); { - let mut peer_store = peer_store.lock(); for _ in 0..8000 { let peer_id = PeerId::random(); peer_store.add_connected_peer(&peer_id, addr.clone(), SessionType::Outbound); @@ -79,10 +74,7 @@ fn random_order_benchmark(c: &mut Criterion) { move || { let peer_store = Rc::clone(&peer_store); let count = 1000; - assert_eq!( - peer_store.lock().peers_to_attempt(count).len() as u32, - count - ); + assert_eq!(peer_store.peers_to_attempt(count).len() as u32, count); } }) } @@ -95,10 +87,7 @@ fn random_order_benchmark(c: &mut Criterion) { move || { let peer_store = Rc::clone(&peer_store); let count = 2000; - assert_eq!( - peer_store.lock().peers_to_attempt(count).len() as u32, - count - ); + assert_eq!(peer_store.peers_to_attempt(count).len() as u32, count); } }) } @@ -110,7 +99,7 @@ fn random_order_benchmark(c: &mut Criterion) { "random order 1000 / 8000 peer_info on filesystem", move |b| { b.iter({ - let mut peer_store = SqlitePeerStore::temp().expect("temp"); + let peer_store = SqlitePeerStore::temp().expect("temp"); let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap(); for _ in 0..8000 { let peer_id = PeerId::random(); diff --git a/network/src/network.rs b/network/src/network.rs index 26fa284888..74990c3223 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -1,4 +1,4 @@ -use crate::errors::{Error, ProtocolError}; +use crate::errors::Error; use crate::peer_store::{sqlite::SqlitePeerStore, PeerStore, Status}; use crate::peers_registry::{ConnectionStatus, PeersRegistry, RegisterResult}; use crate::protocols::{ @@ -60,8 +60,8 @@ pub struct SessionInfo { pub struct NetworkState { protocol_ids: RwLock>, - pub(crate) peers_registry: RwLock, - peer_store: Arc>, + pub(crate) peers_registry: PeersRegistry, + peer_store: Arc, listened_addresses: RwLock>, pub(crate) original_listened_addresses: RwLock>, // For avoid repeat failed dial @@ -82,14 +82,14 @@ impl NetworkState { .chain(config.public_addresses.iter()) .map(|addr| (addr.to_owned(), std::u8::MAX)) .collect(); - let peer_store: Arc> = { - let mut peer_store = + let peer_store: Arc = { + let peer_store = SqlitePeerStore::file(config.peer_store_path().to_string_lossy().to_string())?; let bootnodes = config.bootnodes()?; for (peer_id, addr) in bootnodes { peer_store.add_bootnode(peer_id, addr); } - Arc::new(RwLock::new(peer_store)) + Arc::new(peer_store) }; let reserved_peers = config @@ -108,8 +108,8 @@ impl NetworkState { Ok(NetworkState { peer_store, config, + peers_registry, failed_dials: RwLock::new(LruCache::new(FAILED_DIAL_CACHE_SIZE)), - peers_registry: RwLock::new(peers_registry), listened_addresses: RwLock::new(listened_addresses), original_listened_addresses: RwLock::new(Vec::new()), local_private_key: local_private_key.clone(), @@ -120,12 +120,12 @@ impl NetworkState { pub fn report(&self, peer_id: &PeerId, behaviour: Behaviour) { info!(target: "network", "report {:?} because {:?}", peer_id, behaviour); - self.peer_store.write().report(peer_id, behaviour); + self.peer_store.report(peer_id, behaviour); } pub fn drop_peer(&self, p2p_control: &mut ServiceControl, peer_id: &PeerId) { debug!(target: "network", "drop peer {:?}", peer_id); - if let Some(peer) = self.peers_registry.write().drop_peer(&peer_id) { + if let Some(peer) = self.peers_registry.drop_peer(&peer_id) { if let Err(err) = p2p_control.disconnect(peer.session_id) { error!(target: "network", "disconnect peer error {:?}", err); } @@ -135,16 +135,17 @@ impl NetworkState { pub fn drop_all(&self, p2p_control: &mut ServiceControl) { debug!(target: "network", "drop all connections..."); let mut peer_ids = Vec::new(); - let mut peers_registry = self.peers_registry.write(); - for (peer_id, peer) in peers_registry.peers_iter() { - peer_ids.push(peer_id.clone()); - if let Err(err) = p2p_control.disconnect(peer.session_id) { - error!(target: "network", "disconnect peer error {:?}", err); + { + for (peer_id, peer) in self.peers_registry.peers_guard().read().iter() { + peer_ids.push(peer_id.clone()); + if let Err(err) = p2p_control.disconnect(peer.session_id) { + error!(target: "network", "disconnect peer error {:?}", err); + } } } - peers_registry.drop_all(); + self.peers_registry.drop_all(); - let mut peer_store = self.peer_store().write(); + let peer_store = self.peer_store(); for peer_id in peer_ids { if peer_store.peer_status(&peer_id) != Status::Disconnected { peer_store.report(&peer_id, Behaviour::UnexpectedDisconnect); @@ -167,36 +168,30 @@ impl NetworkState { } pub(crate) fn get_peer_index(&self, peer_id: &PeerId) -> Option { - let peers_registry = self.peers_registry.read(); - peers_registry.get(&peer_id).map(|peer| peer.peer_index) + self.peers_registry + .peers_guard() + .read() + .get(&peer_id) + .map(|peer| peer.peer_index) } pub(crate) fn get_peer_id(&self, peer_index: PeerIndex) -> Option { - let peers_registry = self.peers_registry.read(); - peers_registry - .get_peer_id(peer_index) - .map(|peer_id| peer_id.to_owned()) + self.peers_registry.get_peer_id(peer_index) } pub(crate) fn connection_status(&self) -> ConnectionStatus { - let peers_registry = self.peers_registry.read(); - peers_registry.connection_status() + self.peers_registry.connection_status() } pub(crate) fn modify_peer(&self, peer_id: &PeerId, f: F) where F: FnOnce(&mut Peer) -> (), { - let mut peers_registry = self.peers_registry.write(); - if let Some(peer) = peers_registry.get_mut(peer_id) { - f(peer); - } + self.peers_registry.modify_peer(peer_id, f); } pub(crate) fn peers_indexes(&self) -> Vec { - let peers_registry = self.peers_registry.read(); - let iter = peers_registry.connected_peers_indexes(); - iter.collect::>() + self.peers_registry.connected_peers_indexes() } pub(crate) fn ban_peer( @@ -206,10 +201,10 @@ impl NetworkState { timeout: Duration, ) { self.drop_peer(p2p_control, peer_id); - self.peer_store.write().ban_peer(peer_id, timeout); + self.peer_store.ban_peer(peer_id, timeout); } - pub(crate) fn peer_store(&self) -> &RwLock { + pub(crate) fn peer_store(&self) -> &Arc { &self.peer_store } @@ -236,10 +231,7 @@ impl NetworkState { // A workaround method for `add_node` rpc call, need to re-write it after new p2p lib integration. pub fn add_node(&self, peer_id: &PeerId, address: Multiaddr) { - let _ = self - .peer_store() - .write() - .add_discovered_addr(peer_id, address); + let _ = self.peer_store().add_discovered_addr(peer_id, address); } fn to_external_url(&self, addr: &Multiaddr) -> String { @@ -255,33 +247,14 @@ impl NetworkState { protocol_id: ProtocolId, protocol_version: ProtocolVersion, ) -> Result { - let mut peers_registry = self.peers_registry.write(); - let register_result = if session_type.is_outbound() { - peers_registry.try_outbound_peer( - peer_id.clone(), - connected_addr, - session_id, - session_type, - ) - } else { - peers_registry.accept_inbound_peer( - peer_id.clone(), - connected_addr, - session_id, - session_type, - ) - }?; - // add session to peer - match peers_registry.get_mut(&peer_id) { - Some(peer) => match peer.protocol_version(protocol_id) { - Some(_) => return Err(ProtocolError::Duplicate(protocol_id).into()), - None => { - peer.protocols.insert(protocol_id, protocol_version); - } - }, - None => unreachable!("get peer after inserted"), - } - Ok(register_result) + self.peers_registry.accept_connection( + peer_id, + connected_addr, + session_id, + session_type, + protocol_id, + protocol_version, + ) } pub fn peer_protocol_version( @@ -289,20 +262,25 @@ impl NetworkState { peer_id: &PeerId, protocol_id: ProtocolId, ) -> Option { - let peers_registry = self.peers_registry.read(); - peers_registry + self.peers_registry + .peers_guard() + .read() .get(peer_id) .and_then(|peer| peer.protocol_version(protocol_id)) } + pub fn session_info(&self, peer_id: &PeerId, protocol_id: ProtocolId) -> Option { - let peers_registry = self.peers_registry.read(); - peers_registry.get(peer_id).map(|peer| { - let protocol_version = peer.protocol_version(protocol_id); - SessionInfo { - peer: peer.clone(), - protocol_version, - } - }) + self.peers_registry + .peers_guard() + .read() + .get(peer_id) + .map(|peer| { + let protocol_version = peer.protocol_version(protocol_id); + SessionInfo { + peer: peer.clone(), + protocol_version, + } + }) } pub fn get_protocol_ids bool>(&self, filter: F) -> Vec { @@ -398,7 +376,7 @@ impl ServiceHandle for EventHandler { .map(|pubkey| pubkey.peer_id()) .expect("Secio must enabled"); - let mut peer_store = self.network_state.peer_store().write(); + let peer_store = self.network_state.peer_store(); if peer_store.peer_status(&peer_id) == Status::Connected { peer_store.report(&peer_id, Behaviour::UnexpectedDisconnect); peer_store.update_status(&peer_id, Status::Disconnected); @@ -432,7 +410,7 @@ impl ServiceHandle for EventHandler { Ok(register_result) => { // update status in peer_store if let RegisterResult::New(_) = register_result { - let mut peer_store = self.network_state.peer_store().write(); + let peer_store = self.network_state.peer_store(); peer_store.update_status(&peer_id, Status::Connected); } } @@ -596,7 +574,6 @@ impl NetworkService { let bootnodes = self .network_state .peer_store() - .read() .bootnodes(max((config.max_outbound_peers / 2) as u32, 1)) .clone(); // dial half bootnodes @@ -681,12 +658,13 @@ impl NetworkController { } pub fn connected_peers(&self) -> Vec<(PeerId, Peer, MultiaddrList)> { - let peer_store = self.network_state.peer_store().read(); + let peer_store = self.network_state.peer_store(); self.network_state .peers_registry + .peers_guard() .read() - .peers_iter() + .iter() .map(|(peer_id, peer)| { ( peer_id.clone(), diff --git a/network/src/peer_store.rs b/network/src/peer_store.rs index 8860b404a5..78029c2955 100644 --- a/network/src/peer_store.rs +++ b/network/src/peer_store.rs @@ -30,18 +30,18 @@ impl Default for PeerScoreConfig { pub trait PeerStore: Send + Sync { /// Add a peer and address into peer_store /// this method will assume peer is connected, which implies address is "verified". - fn add_connected_peer(&mut self, peer_id: &PeerId, address: Multiaddr, endpoint: SessionType); + fn add_connected_peer(&self, peer_id: &PeerId, address: Multiaddr, endpoint: SessionType); /// Add discovered peer addresses /// this method will assume peer and addr is untrust since we have not connected to it. - fn add_discovered_addr(&mut self, peer_id: &PeerId, address: Multiaddr) -> bool; + fn add_discovered_addr(&self, peer_id: &PeerId, address: Multiaddr) -> bool; /// Report peer behaviours - fn report(&mut self, peer_id: &PeerId, behaviour: Behaviour) -> ReportResult; + fn report(&self, peer_id: &PeerId, behaviour: Behaviour) -> ReportResult; /// Update peer status - fn update_status(&mut self, peer_id: &PeerId, status: Status); + fn update_status(&self, peer_id: &PeerId, status: Status); fn peer_status(&self, peer_id: &PeerId) -> Status; fn peer_score(&self, peer_id: &PeerId) -> Option; /// Add bootnode - fn add_bootnode(&mut self, peer_id: PeerId, addr: Multiaddr); + fn add_bootnode(&self, peer_id: PeerId, addr: Multiaddr); /// This method randomly return peers, it return bootnodes if no other peers in PeerStore. fn bootnodes(&self, count: u32) -> Vec<(PeerId, Multiaddr)>; /// Get addrs of a peer, note a peer may have multiple addrs @@ -54,7 +54,7 @@ pub trait PeerStore: Send + Sync { /// Randomly get peers fn random_peers(&self, count: u32) -> Vec<(PeerId, Multiaddr)>; /// Ban a peer - fn ban_peer(&mut self, peer_id: &PeerId, timeout: Duration); + fn ban_peer(&self, peer_id: &PeerId, timeout: Duration); /// Check peer ban status fn is_banned(&self, peer_id: &PeerId) -> bool; /// peer score config diff --git a/network/src/peer_store/sqlite/peer_store.rs b/network/src/peer_store/sqlite/peer_store.rs index 10d1c20622..5ab99300c9 100644 --- a/network/src/peer_store/sqlite/peer_store.rs +++ b/network/src/peer_store/sqlite/peer_store.rs @@ -15,6 +15,7 @@ use crate::peer_store::{ Behaviour, Multiaddr, PeerId, PeerScoreConfig, PeerStore, ReportResult, Score, Status, }; use crate::SessionType; +use ckb_util::RwLock; use faketime::unix_time; use fnv::FnvHashMap; use std::time::Duration; @@ -30,17 +31,17 @@ const DEFAULT_POOL_SIZE: u32 = 32; const DEFAULT_ADDRS: u32 = 3; pub struct SqlitePeerStore { - bootnodes: Vec<(PeerId, Multiaddr)>, + bootnodes: RwLock>, peer_score_config: PeerScoreConfig, - ban_list: FnvHashMap, Duration>, + ban_list: RwLock, Duration>>, pub(crate) pool: ConnectionPool, } impl SqlitePeerStore { pub fn new(connection_pool: ConnectionPool, peer_score_config: PeerScoreConfig) -> Self { - let mut peer_store = SqlitePeerStore { - bootnodes: Vec::new(), - ban_list: Default::default(), + let peer_store = SqlitePeerStore { + bootnodes: RwLock::new(Vec::new()), + ban_list: RwLock::new(Default::default()), pool: connection_pool, peer_score_config, }; @@ -63,31 +64,32 @@ impl SqlitePeerStore { Self::file("".into()) } - fn prepare(&mut self) -> Result<(), DBError> { + fn prepare(&self) -> Result<(), DBError> { self.create_tables()?; self.reset_status()?; self.load_banlist() } - fn create_tables(&mut self) -> Result<(), DBError> { + fn create_tables(&self) -> Result<(), DBError> { self.pool.fetch(|conn| db::create_tables(conn)) } - fn reset_status(&mut self) -> Result { + fn reset_status(&self) -> Result { self.pool.fetch(|conn| db::PeerInfo::reset_status(conn)) } - fn load_banlist(&mut self) -> Result<(), DBError> { + fn load_banlist(&self) -> Result<(), DBError> { self.clear_expires_banned_ip()?; let now = unix_time(); let ban_records = self.pool.fetch(|conn| db::get_ban_records(conn, now))?; + let mut guard = self.ban_list.write(); for (ip, ban_time) in ban_records { - self.ban_list.insert(ip, ban_time); + guard.insert(ip, ban_time); } Ok(()) } - fn ban_ip(&mut self, addr: &Multiaddr, timeout: Duration) { + fn ban_ip(&self, addr: &Multiaddr, timeout: Duration) { let ip = { match addr.extract_ip_addr_binary() { Some(binary) => binary, @@ -100,8 +102,9 @@ impl SqlitePeerStore { .fetch(|conn| db::insert_ban_record(&conn, &ip, ban_time)) .expect("ban ip"); } - self.ban_list.insert(ip, ban_time); - if self.ban_list.len() > BAN_LIST_CLEAR_EXPIRES_SIZE { + let mut guard = self.ban_list.write(); + guard.insert(ip, ban_time); + if guard.len() > BAN_LIST_CLEAR_EXPIRES_SIZE { self.clear_expires_banned_ip().expect("clear ban list"); } } @@ -112,25 +115,26 @@ impl SqlitePeerStore { None => return false, }; let now = unix_time(); - match self.ban_list.get(&ip) { + match self.ban_list.read().get(&ip) { Some(ban_time) => *ban_time > now, None => false, } } - fn clear_expires_banned_ip(&mut self) -> Result<(), DBError> { + fn clear_expires_banned_ip(&self) -> Result<(), DBError> { let now = unix_time(); let ips = self .pool .fetch(|conn| db::clear_expires_banned_ip(conn, now))?; + let mut guard = self.ban_list.write(); for ip in ips { - self.ban_list.remove(&ip); + guard.remove(&ip); } Ok(()) } /// check and try delete peer_info if peer_infos reach limit - fn check_store_limit(&mut self) -> Result<(), ()> { + fn check_store_limit(&self) -> Result<(), ()> { let peer_info_count = self .pool .fetch(|conn| db::PeerInfo::count(conn)) @@ -167,7 +171,7 @@ impl SqlitePeerStore { Ok(()) } - fn fetch_peer_info(&mut self, peer_id: &PeerId) -> db::PeerInfo { + fn fetch_peer_info(&self, peer_id: &PeerId) -> db::PeerInfo { let blank_addr = &Multiaddr::from_bytes(Vec::new()).expect("null multiaddr"); self.pool .fetch(|conn| { @@ -207,7 +211,7 @@ impl SqlitePeerStore { } impl PeerStore for SqlitePeerStore { - fn add_connected_peer(&mut self, peer_id: &PeerId, addr: Multiaddr, endpoint: SessionType) { + fn add_connected_peer(&self, peer_id: &PeerId, addr: Multiaddr, endpoint: SessionType) { if self.check_store_limit().is_err() { return; } @@ -243,7 +247,7 @@ impl PeerStore for SqlitePeerStore { .expect("upsert peer info"); } - fn add_discovered_addr(&mut self, peer_id: &PeerId, addr: Multiaddr) -> bool { + fn add_discovered_addr(&self, peer_id: &PeerId, addr: Multiaddr) -> bool { // peer store is full if self.check_store_limit().is_err() { return false; @@ -256,7 +260,7 @@ impl PeerStore for SqlitePeerStore { inserted > 0 } - fn report(&mut self, peer_id: &PeerId, behaviour: Behaviour) -> ReportResult { + fn report(&self, peer_id: &PeerId, behaviour: Behaviour) -> ReportResult { if self.is_banned(peer_id) { return ReportResult::Banned; } @@ -272,7 +276,7 @@ impl PeerStore for SqlitePeerStore { ReportResult::Ok } - fn update_status(&mut self, peer_id: &PeerId, status: Status) { + fn update_status(&self, peer_id: &PeerId, status: Status) { if let Some(peer) = self.get_peer_info(peer_id) { self.pool .fetch(|conn| db::PeerInfo::update_status(&conn, peer.id, status)) @@ -290,14 +294,14 @@ impl PeerStore for SqlitePeerStore { self.get_peer_info(peer_id).map(|peer| peer.score) } - fn add_bootnode(&mut self, peer_id: PeerId, addr: Multiaddr) { - self.bootnodes.push((peer_id, addr)); + fn add_bootnode(&self, peer_id: PeerId, addr: Multiaddr) { + self.bootnodes.write().push((peer_id, addr)); } // should return high scored nodes if possible, otherwise, return boostrap nodes fn bootnodes(&self, count: u32) -> Vec<(PeerId, Multiaddr)> { let mut peers = self.peers_to_attempt(count); if peers.len() < count as usize { - for (peer_id, addr) in &self.bootnodes { + for (peer_id, addr) in self.bootnodes.read().iter() { let peer = (peer_id.to_owned(), addr.to_owned()); if !peers.contains(&peer) { peers.push(peer); @@ -348,7 +352,7 @@ impl PeerStore for SqlitePeerStore { .expect("get random peers") } - fn ban_peer(&mut self, peer_id: &PeerId, timeout: Duration) { + fn ban_peer(&self, peer_id: &PeerId, timeout: Duration) { if let Some(peer) = self.get_peer_info(peer_id) { self.ban_ip(&peer.connected_addr, timeout); } diff --git a/network/src/peers_registry.rs b/network/src/peers_registry.rs index 0ccb13ceab..a3794b0f2a 100644 --- a/network/src/peers_registry.rs +++ b/network/src/peers_registry.rs @@ -1,7 +1,7 @@ use crate::peer_store::PeerStore; use crate::{ errors::{Error, PeerError}, - Peer, PeerId, PeerIndex, SessionType, + Peer, PeerId, PeerIndex, ProtocolId, ProtocolVersion, SessionType, }; use ckb_util::RwLock; use fnv::{FnvHashMap, FnvHashSet}; @@ -9,7 +9,6 @@ use log::debug; use p2p::{multiaddr::Multiaddr, SessionId}; use rand::seq::SliceRandom; use rand::thread_rng; -use std::collections::hash_map::Entry; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -30,78 +29,6 @@ impl RegisterResult { } } -struct PeerManage { - id_allocator: AtomicUsize, - peers: FnvHashMap, - pub(crate) peer_id_by_index: FnvHashMap, -} - -impl PeerManage { - #[inline] - fn get(&self, peer_id: &PeerId) -> Option<&Peer> { - self.peers.get(peer_id) - } - - #[inline] - fn get_peer_id(&self, peer_index: PeerIndex) -> Option<&PeerId> { - self.peer_id_by_index.get(&peer_index) - } - - #[inline] - fn get_mut(&mut self, peer_id: &PeerId) -> Option<&mut Peer> { - self.peers.get_mut(peer_id) - } - - #[inline] - fn remove(&mut self, peer_id: &PeerId) -> Option { - if let Some(peer) = self.peers.remove(peer_id) { - self.peer_id_by_index.remove(&peer.peer_index); - return Some(peer); - } - None - } - - #[inline] - fn iter(&self) -> impl Iterator { - self.peers.iter() - } - #[inline] - fn add_peer( - &mut self, - peer_id: PeerId, - connected_addr: Multiaddr, - session_id: SessionId, - session_type: SessionType, - ) -> RegisterResult { - match self.peers.entry(peer_id.clone()) { - Entry::Occupied(entry) => RegisterResult::Exist(entry.get().peer_index), - Entry::Vacant(entry) => { - let peer_index = self.id_allocator.fetch_add(1, Ordering::Relaxed); - let peer = Peer::new(peer_index, connected_addr, session_id, session_type); - entry.insert(peer); - self.peer_id_by_index.insert(peer_index, peer_id); - RegisterResult::New(peer_index) - } - } - } - - fn clear(&mut self) { - self.peers.clear(); - self.peer_id_by_index.clear(); - self.id_allocator.store(0, Ordering::Relaxed) - } -} - -impl Default for PeerManage { - fn default() -> Self { - PeerManage { - id_allocator: AtomicUsize::new(0), - peers: FnvHashMap::with_capacity_and_hasher(20, Default::default()), - peer_id_by_index: FnvHashMap::with_capacity_and_hasher(20, Default::default()), - } - } -} - #[derive(Clone, Copy, Debug)] pub struct ConnectionStatus { pub total: u32, @@ -112,9 +39,10 @@ pub struct ConnectionStatus { } pub(crate) struct PeersRegistry { - // store all known peers - peer_store: Arc>, - peers: PeerManage, + id_allocator: AtomicUsize, + peers: RwLock>, + peer_id_by_index: RwLock>, + peer_store: Arc, // max inbound limitation max_inbound: u32, // max outbound limitation @@ -154,7 +82,7 @@ where impl PeersRegistry { pub fn new( - peer_store: Arc>, + peer_store: Arc, max_inbound: u32, max_outbound: u32, reserved_only: bool, @@ -166,8 +94,13 @@ impl PeersRegistry { reserved_peers_set.insert(reserved_peer); } PeersRegistry { + id_allocator: AtomicUsize::new(0), + peers: RwLock::new(FnvHashMap::with_capacity_and_hasher(20, Default::default())), + peer_id_by_index: RwLock::new(FnvHashMap::with_capacity_and_hasher( + 20, + Default::default(), + )), peer_store, - peers: Default::default(), reserved_peers: reserved_peers_set, max_inbound, max_outbound, @@ -175,87 +108,83 @@ impl PeersRegistry { } } - #[inline] - pub fn get_peer_id(&self, peer_index: PeerIndex) -> Option<&PeerId> { - self.peers.get_peer_id(peer_index) + pub fn get_peer_id(&self, peer_index: PeerIndex) -> Option { + self.peer_indexes_guard().read().get(&peer_index).cloned() } pub fn is_reserved(&self, peer_id: &PeerId) -> bool { self.reserved_peers.contains(&peer_id) } - pub fn accept_inbound_peer( - &mut self, + pub(crate) fn accept_connection( + &self, peer_id: PeerId, - addr: Multiaddr, + connected_addr: Multiaddr, session_id: SessionId, session_type: SessionType, + protocol_id: ProtocolId, + protocol_version: ProtocolVersion, ) -> Result { - if let Some(peer) = self.peers.get(&peer_id) { + let mut peers = self.peers.write(); + + if let Some(peer) = peers.get(&peer_id) { return Ok(RegisterResult::Exist(peer.peer_index)); } - if !self.is_reserved(&peer_id) { - if self.reserved_only { - return Err(Error::Peer(PeerError::NonReserved(peer_id))); - } - if self.peer_store.read().is_banned(&peer_id) { - return Err(Error::Peer(PeerError::Banned(peer_id))); - } - let connection_status = self.connection_status(); - // check peers connection limitation - if connection_status.unreserved_inbound >= self.max_inbound - && !self.try_evict_inbound_peer() - { - return Err(Error::Peer(PeerError::ReachMaxInboundLimit(peer_id))); - } - } - Ok(self.register_peer(peer_id, addr, session_id, session_type)) - } + let inbound = session_type.is_inbound(); + let mut peer_id_by_index = self.peer_id_by_index.write(); - pub fn try_outbound_peer( - &mut self, - peer_id: PeerId, - addr: Multiaddr, - session_id: SessionId, - session_type: SessionType, - ) -> Result { - if let Some(peer) = self.peers.get(&peer_id) { - return Ok(RegisterResult::Exist(peer.peer_index)); - } if !self.is_reserved(&peer_id) { if self.reserved_only { return Err(Error::Peer(PeerError::NonReserved(peer_id))); } - if self.peer_store.read().is_banned(&peer_id) { + // ban_list lock acquired + if self.peer_store.is_banned(&peer_id) { return Err(Error::Peer(PeerError::Banned(peer_id))); } - let connection_status = self.connection_status(); + + let connection_status = self._connection_status(peers.iter()); // check peers connection limitation - // TODO: implement extra outbound peer logic - if connection_status.unreserved_outbound >= self.max_outbound { + if inbound { + if connection_status.unreserved_inbound >= self.max_inbound + && !self._try_evict_inbound_peer(&mut peers, &mut peer_id_by_index) + { + return Err(Error::Peer(PeerError::ReachMaxInboundLimit(peer_id))); + } + } else if connection_status.unreserved_outbound >= self.max_outbound { return Err(Error::Peer(PeerError::ReachMaxOutboundLimit(peer_id))); } } - Ok(self.register_peer(peer_id, addr, session_id, session_type)) + self.peer_store + .add_connected_peer(&peer_id, connected_addr.clone(), session_type); + let peer_index = self.id_allocator.fetch_add(1, Ordering::Relaxed); + let mut peer = Peer::new(peer_index, connected_addr, session_id, session_type); + peer.protocols.insert(protocol_id, protocol_version); + peers.insert(peer_id.clone(), peer); + peer_id_by_index.insert(peer_index, peer_id); + Ok(RegisterResult::New(peer_index)) } - fn try_evict_inbound_peer(&mut self) -> bool { + fn _try_evict_inbound_peer( + &self, + peers: &mut FnvHashMap, + peer_id_by_index: &mut FnvHashMap, + ) -> bool { let peer_id: PeerId = { - let mut candidate_peers = self - .peers - .iter() - .filter(|(peer_id, peer)| peer.is_inbound() && !self.is_reserved(peer_id)) - .collect::>(); - let peer_store = self.peer_store.read(); + let mut candidate_peers = { + peers + .iter() + .filter(|(peer_id, peer)| peer.is_inbound() && !self.is_reserved(peer_id)) + .collect::>() + }; // Protect peers based on characteristics that an attacker hard to simulate or manipulate // Protect peers which has the highest score sort_then_drop_last_n_elements( &mut candidate_peers, EVICTION_PROTECT_PEERS, |(peer_id1, _), (peer_id2, _)| { - let peer1_score = peer_store.peer_score(peer_id1).unwrap_or_default(); - let peer2_score = peer_store.peer_score(peer_id2).unwrap_or_default(); + let peer1_score = self.peer_store.peer_score(peer_id1).unwrap_or_default(); + let peer2_score = self.peer_store.peer_score(peer_id2).unwrap_or_default(); peer1_score.cmp(&peer2_score) }, ); @@ -308,52 +237,33 @@ impl PeersRegistry { // randomly evict a lowest scored peer match evict_group .iter() - .min_by_key(|peer_id| peer_store.peer_score(peer_id).unwrap_or_default()) + .min_by_key(|peer_id| self.peer_store.peer_score(peer_id).unwrap_or_default()) { Some(peer_id) => peer_id.to_owned().to_owned(), None => return false, } }; debug!(target: "network", "evict inbound peer {:?}", peer_id); - self.drop_peer(&peer_id); + self._drop_peer(&peer_id, peers, peer_id_by_index); true } - // registry a new peer - fn register_peer( - &mut self, - peer_id: PeerId, - connected_addr: Multiaddr, - session_id: SessionId, - session_type: SessionType, - ) -> RegisterResult { - self.peer_store - .write() - .add_connected_peer(&peer_id, connected_addr.clone(), session_type); - self.peers - .add_peer(peer_id, connected_addr, session_id, session_type) - } - - #[inline] - pub(crate) fn peers_iter(&self) -> impl Iterator { - self.peers.iter() - } - - #[inline] - pub fn get(&self, peer_id: &PeerId) -> Option<&Peer> { - self.peers.get(peer_id) - } - - #[inline] - pub fn get_mut(&mut self, peer_id: &PeerId) -> Option<&mut Peer> { - self.peers.get_mut(peer_id) + pub fn modify_peer( + &self, + peer_id: &PeerId, + callback: impl FnOnce(&mut Peer) -> R, + ) -> Option { + self.peers.write().get_mut(peer_id).map(callback) } - pub fn connection_status(&self) -> ConnectionStatus { + pub fn _connection_status<'a>( + &self, + peers: impl Iterator, + ) -> ConnectionStatus { let mut total: u32 = 0; let mut unreserved_inbound: u32 = 0; let mut unreserved_outbound: u32 = 0; - for (peer_id, peer_connection) in self.peers.iter() { + for (peer_id, peer_connection) in peers { total += 1; if self.is_reserved(peer_id) { continue; @@ -373,19 +283,61 @@ impl PeersRegistry { } } - #[inline] - pub fn connected_peers_indexes(&self) -> impl Iterator + '_ { - self.peers.peer_id_by_index.iter().map(|(k, _v)| *k) + pub fn connection_status(&self) -> ConnectionStatus { + self._connection_status(self.peers.read().iter()) } #[inline] - pub fn drop_peer(&mut self, peer_id: &PeerId) -> Option { - self.peers.remove(peer_id) + pub fn connected_peers_indexes(&self) -> Vec { + self.peer_id_by_index + .read() + .iter() + .map(|(k, _v)| *k) + .collect::>() + } + + fn _drop_peer( + &self, + peer_id: &PeerId, + peers: &mut FnvHashMap, + peer_id_by_index: &mut FnvHashMap, + ) -> Option { + if let Some(peer) = peers.remove(peer_id) { + peer_id_by_index.remove(&peer.peer_index); + return Some(peer); + } + None } #[inline] - pub fn drop_all(&mut self) { + pub fn drop_peer(&self, peer_id: &PeerId) -> Option { + let mut peers = self.peers.write(); + let mut peer_id_by_index = self.peer_id_by_index.write(); + self._drop_peer(peer_id, &mut peers, &mut peer_id_by_index) + } + + pub fn peers_guard(&self) -> &RwLock> { + &self.peers + } + + fn peer_indexes_guard(&self) -> &RwLock> { + &self.peer_id_by_index + } + + fn _drop_all( + &self, + peers: &mut FnvHashMap, + peer_id_by_index: &mut FnvHashMap, + ) { + peers.clear(); + peer_id_by_index.clear(); + self.id_allocator.store(0, Ordering::Relaxed) + } + + pub fn drop_all(&self) { debug!(target: "network", "drop_all"); - self.peers.clear() + let mut peers = self.peers.write(); + let mut peer_id_by_index = self.peer_id_by_index.write(); + self._drop_all(&mut peers, &mut peer_id_by_index); } } diff --git a/network/src/protocols/discovery.rs b/network/src/protocols/discovery.rs index 360a7deedb..5ba78ea339 100644 --- a/network/src/protocols/discovery.rs +++ b/network/src/protocols/discovery.rs @@ -203,7 +203,6 @@ impl Stream for DiscoveryService { let _ = self .network_state .peer_store() - .write() .add_discovered_addr(&peer_id, addr); } } @@ -220,7 +219,6 @@ impl Stream for DiscoveryService { let addrs = self .network_state .peer_store() - .read() .random_peers(n as u32) .into_iter() .filter_map(|(peer_id, mut addr)| { diff --git a/network/src/protocols/identify.rs b/network/src/protocols/identify.rs index 5bd06340d1..2519b0731f 100644 --- a/network/src/protocols/identify.rs +++ b/network/src/protocols/identify.rs @@ -57,7 +57,7 @@ impl Callback for IdentifyCallback { ); self.remote_listen_addrs .insert(peer_id.clone(), addrs.clone()); - let mut peer_store = self.network_state.peer_store().write(); + let peer_store = self.network_state.peer_store(); for addr in addrs { let _ = peer_store.add_discovered_addr(&peer_id, addr); } @@ -104,7 +104,6 @@ impl Callback for IdentifyCallback { let _ = self .network_state .peer_store() - .write() .add_discovered_addr(local_peer_id, transformed_addr); } // NOTE: for future usage diff --git a/network/src/protocols/mod.rs b/network/src/protocols/mod.rs index bce567e39e..47679622a7 100644 --- a/network/src/protocols/mod.rs +++ b/network/src/protocols/mod.rs @@ -161,7 +161,7 @@ impl ServiceProtocol for CKBHandler { Ok(register_result) => { // update status in peer_store if let RegisterResult::New(_) = register_result { - let mut peer_store = network.peer_store().write(); + let peer_store = network.peer_store(); peer_store.report(&peer_id, Behaviour::Connect); peer_store.update_status(&peer_id, Status::Connected); } @@ -340,6 +340,7 @@ impl CKBProtocolContext for DefaultCKBProtocolContext { let session_id = self .network_state .peers_registry + .peers_guard() .read() .get(&peer_id) .ok_or_else(|| PeerError::NotFound(peer_id.to_owned())) @@ -365,7 +366,6 @@ impl CKBProtocolContext for DefaultCKBProtocolContext { if self .network_state .peer_store() - .write() .report(&peer_id, behaviour) .is_banned() { diff --git a/network/src/protocols/outbound_peer.rs b/network/src/protocols/outbound_peer.rs index 513543fe39..e54d60557b 100644 --- a/network/src/protocols/outbound_peer.rs +++ b/network/src/protocols/outbound_peer.rs @@ -30,11 +30,7 @@ impl OutboundPeerService { } fn attempt_dial_peers(&mut self, count: u32) { - let attempt_peers = self - .network_state - .peer_store() - .read() - .peers_to_attempt(count + 5); + let attempt_peers = self.network_state.peer_store().peers_to_attempt(count + 5); let mut p2p_control = self.p2p_control.clone(); trace!(target: "network", "count={}, attempt_peers: {:?}", count, attempt_peers); for (peer_id, addr) in attempt_peers @@ -61,11 +57,7 @@ impl OutboundPeerService { } fn feeler_peers(&mut self, count: u32) { - let peers = self - .network_state - .peer_store() - .read() - .peers_to_feeler(count); + let peers = self.network_state.peer_store().peers_to_feeler(count); let mut p2p_control = self.p2p_control.clone(); for (peer_id, addr) in peers .into_iter() diff --git a/network/src/tests/peers_registry.rs b/network/src/tests/peers_registry.rs index 7e1eb4fd3e..0e0f0de4e7 100644 --- a/network/src/tests/peers_registry.rs +++ b/network/src/tests/peers_registry.rs @@ -2,16 +2,18 @@ use crate::{ multiaddr::ToMultiaddr, peer_store::{PeerStore, SqlitePeerStore}, peers_registry::{PeersRegistry, EVICTION_PROTECT_PEERS}, - Behaviour, PeerId, SessionType, + Behaviour, PeerId, ProtocolId, ProtocolVersion, SessionType, }; -use ckb_util::RwLock; use std::sync::Arc; use std::time::{Duration, Instant}; -fn new_peer_store() -> Arc> { - Arc::new(RwLock::new(SqlitePeerStore::temp().expect("temp"))) +fn new_peer_store() -> Arc { + Arc::new(SqlitePeerStore::temp().expect("temp")) } +const TEST_PROTOCOL_ID: ProtocolId = 0; +const TEST_PROTOCOL_VERSION: ProtocolVersion = 0; + #[test] fn test_accept_inbound_peer_in_reserve_only_mode() { let peer_store = new_peer_store(); @@ -21,7 +23,7 @@ fn test_accept_inbound_peer_in_reserve_only_mode() { let session_type = SessionType::Inbound; // reserved_only mode: only accept reserved_peer - let mut peers = PeersRegistry::new( + let peers = PeersRegistry::new( Arc::clone(&peer_store), 3, 3, @@ -29,14 +31,24 @@ fn test_accept_inbound_peer_in_reserve_only_mode() { vec![reserved_peer.clone()], ); assert!(peers - .accept_inbound_peer(PeerId::random(), addr.clone(), session_id, session_type) + .accept_connection( + PeerId::random(), + addr.clone(), + session_id, + session_type, + 0, + 0 + ) .is_err()); + peers - .accept_inbound_peer( + .accept_connection( reserved_peer.clone(), addr.clone(), session_id, session_type, + 0, + 0, ) .expect("accept"); } @@ -49,7 +61,7 @@ fn test_accept_inbound_peer_until_full() { let session_id = 1; let session_type = SessionType::Inbound; // accept node until inbound connections is full - let mut peers = PeersRegistry::new( + let peers = PeersRegistry::new( Arc::clone(&peer_store), 3, 3, @@ -57,30 +69,67 @@ fn test_accept_inbound_peer_until_full() { vec![reserved_peer.clone()], ); peers - .accept_inbound_peer(PeerId::random(), addr.clone(), session_id, session_type) + .accept_connection( + PeerId::random(), + addr.clone(), + session_id, + session_type, + TEST_PROTOCOL_ID, + TEST_PROTOCOL_VERSION, + ) .expect("accept"); peers - .accept_inbound_peer(PeerId::random(), addr.clone(), session_id, session_type) + .accept_connection( + PeerId::random(), + addr.clone(), + session_id, + session_type, + TEST_PROTOCOL_ID, + TEST_PROTOCOL_VERSION, + ) .expect("accept"); peers - .accept_inbound_peer(PeerId::random(), addr.clone(), session_id, session_type) + .accept_connection( + PeerId::random(), + addr.clone(), + session_id, + session_type, + TEST_PROTOCOL_ID, + TEST_PROTOCOL_VERSION, + ) .expect("accept"); println!("{:?}", peers.connection_status()); assert!(peers - .accept_inbound_peer(PeerId::random(), addr.clone(), session_id, session_type) + .accept_connection( + PeerId::random(), + addr.clone(), + session_id, + session_type, + TEST_PROTOCOL_ID, + TEST_PROTOCOL_VERSION + ) .is_err(),); // should still accept reserved peer peers - .accept_inbound_peer( + .accept_connection( reserved_peer.clone(), addr.clone(), session_id, session_type, + TEST_PROTOCOL_ID, + TEST_PROTOCOL_VERSION, ) .expect("accept"); // should refuse accept low score peer assert!(peers - .accept_inbound_peer(PeerId::random(), addr.clone(), session_id, session_type) + .accept_connection( + PeerId::random(), + addr.clone(), + session_id, + session_type, + TEST_PROTOCOL_ID, + TEST_PROTOCOL_VERSION + ) .is_err()); } @@ -101,7 +150,7 @@ fn test_accept_inbound_peer_eviction() { // prepare protected peers let longest_connection_time_peers_count = 5; let protected_peers_count = 3 * EVICTION_PROTECT_PEERS + longest_connection_time_peers_count; - let mut peers_registry = PeersRegistry::new( + let peers_registry = PeersRegistry::new( Arc::clone(&peer_store), (protected_peers_count + longest_connection_time_peers_count) as u32, 3, @@ -110,17 +159,29 @@ fn test_accept_inbound_peer_eviction() { ); for _ in 0..protected_peers_count { assert!(peers_registry - .accept_inbound_peer(PeerId::random(), addr2.clone(), session_id, session_type) + .accept_connection( + PeerId::random(), + addr2.clone(), + session_id, + session_type, + TEST_PROTOCOL_ID, + TEST_PROTOCOL_VERSION + ) .is_ok()); } - let mut peers_iter = peers_registry - .peers_iter() - .map(|(peer_id, _)| peer_id.to_owned()) - .collect::>() - .into_iter(); + let peers: Vec<_> = { + peers_registry + .peers_guard() + .read() + .iter() + .map(|(peer_id, _)| peer_id) + .cloned() + .collect() + }; + + let mut peers_iter = peers.iter(); // higest scored peers { - let mut peer_store = peer_store.write(); for _ in 0..EVICTION_PROTECT_PEERS { let peer_id = peers_iter.next().unwrap(); peer_store.report(&peer_id, Behaviour::Ping); @@ -130,8 +191,9 @@ fn test_accept_inbound_peer_eviction() { // lowest ping peers for _ in 0..EVICTION_PROTECT_PEERS { let peer_id = peers_iter.next().unwrap(); - let mut peer = peers_registry.get_mut(&peer_id).unwrap(); - peer.ping = Some(Duration::from_secs(0)); + peers_registry.modify_peer(&peer_id, |peer| { + peer.ping = Some(Duration::from_secs(0)); + }); } // to prevent time error, we set now to 10ago. @@ -139,69 +201,82 @@ fn test_accept_inbound_peer_eviction() { // peers which most recently sent messages for _ in 0..EVICTION_PROTECT_PEERS { let peer_id = peers_iter.next().unwrap(); - let mut peer = peers_registry.get_mut(&peer_id).unwrap(); - peer.last_message_time = Some(now + Duration::from_secs(10)); + peers_registry.modify_peer(&peer_id, |peer| { + peer.last_message_time = Some(now + Duration::from_secs(10)); + }); } // protect 5 peers which have the longest connection time for _ in 0..longest_connection_time_peers_count { let peer_id = peers_iter.next().unwrap(); - let mut peer = peers_registry.get_mut(&peer_id).unwrap(); - peer.connected_time = now - Duration::from_secs(10); + peers_registry.modify_peer(&peer_id, |peer| { + peer.connected_time = now - Duration::from_secs(10); + }); } let mut new_peer_ids = (0..3).map(|_| PeerId::random()).collect::>(); // setup 3 node and 1 reserved node from addr1 peers_registry - .accept_inbound_peer( + .accept_connection( reserved_peer.clone(), addr1.clone(), session_id, session_type, + TEST_PROTOCOL_ID, + TEST_PROTOCOL_VERSION, ) .expect("accept"); peers_registry - .accept_inbound_peer( + .accept_connection( evict_target.clone(), addr1.clone(), session_id, session_type, + TEST_PROTOCOL_ID, + TEST_PROTOCOL_VERSION, ) .expect("accept"); peers_registry - .accept_inbound_peer( + .accept_connection( new_peer_ids[0].clone(), addr1.clone(), session_id, session_type, + TEST_PROTOCOL_ID, + TEST_PROTOCOL_VERSION, ) .expect("accept"); peers_registry - .accept_inbound_peer( + .accept_connection( new_peer_ids[1].clone(), addr1.clone(), session_id, session_type, + TEST_PROTOCOL_ID, + TEST_PROTOCOL_VERSION, ) .expect("accept"); // setup 2 node from addr2 peers_registry - .accept_inbound_peer( + .accept_connection( lowest_score_peer.clone(), addr2.clone(), session_id, session_type, + TEST_PROTOCOL_ID, + TEST_PROTOCOL_VERSION, ) .expect("accept"); peers_registry - .accept_inbound_peer( + .accept_connection( new_peer_ids[2].clone(), addr2.clone(), session_id, session_type, + TEST_PROTOCOL_ID, + TEST_PROTOCOL_VERSION, ) .expect("accept"); // setup score { - let mut peer_store = peer_store.write(); peer_store.report(&lowest_score_peer, Behaviour::FailedToPing); peer_store.report(&lowest_score_peer, Behaviour::FailedToPing); peer_store.report(&lowest_score_peer, Behaviour::FailedToPing); @@ -216,14 +291,30 @@ fn test_accept_inbound_peer_eviction() { lowest_score_peer.clone(), ]); for peer_id in new_peer_ids { - let mut peer = peers_registry.get_mut(&peer_id).unwrap(); - // push the connected_time to make sure peer is unprotect - peer.connected_time = now + Duration::from_secs(10); + peers_registry.modify_peer(&peer_id, |peer| { + // push the connected_time to make sure peer is unprotect + peer.connected_time = now + Duration::from_secs(10); + }); } // should evict evict target - assert!(peers_registry.get(&evict_target).is_some()); + assert!(peers_registry + .peers_guard() + .read() + .get(&evict_target) + .is_some()); peers_registry - .accept_inbound_peer(PeerId::random(), addr1.clone(), session_id, session_type) + .accept_connection( + PeerId::random(), + addr1.clone(), + session_id, + session_type, + TEST_PROTOCOL_ID, + TEST_PROTOCOL_VERSION, + ) .expect("accept"); - assert!(peers_registry.get(&evict_target).is_none()); + assert!(peers_registry + .peers_guard() + .read() + .get(&evict_target) + .is_none()); } diff --git a/network/src/tests/sqlite_peer_store.rs b/network/src/tests/sqlite_peer_store.rs index 8c2f9e176c..5e76b43fc5 100644 --- a/network/src/tests/sqlite_peer_store.rs +++ b/network/src/tests/sqlite_peer_store.rs @@ -19,7 +19,7 @@ fn new_peer_store() -> SqlitePeerStore { #[test] fn test_add_connected_peer() { - let mut peer_store: Box = Box::new(new_peer_store()); + let peer_store: Box = Box::new(new_peer_store()); let peer_id = PeerId::random(); let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap(); peer_store.add_connected_peer(&peer_id, addr, SessionType::Outbound); @@ -32,7 +32,7 @@ fn test_add_connected_peer() { #[test] fn test_add_discovered_addr() { - let mut peer_store: Box = Box::new(new_peer_store()); + let peer_store: Box = Box::new(new_peer_store()); let peer_id = PeerId::random(); peer_store.add_discovered_addr(&peer_id, "/ip4/127.0.0.1".to_multiaddr().unwrap()); assert_eq!(peer_store.peer_addrs(&peer_id, 2).unwrap().len(), 1); @@ -40,7 +40,7 @@ fn test_add_discovered_addr() { #[test] fn test_report() { - let mut peer_store: Box = Box::new(new_peer_store()); + let peer_store: Box = Box::new(new_peer_store()); let peer_id = PeerId::random(); assert!(peer_store.report(&peer_id, Behaviour::Ping).is_ok()); assert!( @@ -51,7 +51,7 @@ fn test_report() { #[test] fn test_update_status() { - let mut peer_store: Box = Box::new(new_peer_store()); + let peer_store: Box = Box::new(new_peer_store()); let peer_id = PeerId::random(); peer_store.update_status(&peer_id, Status::Connected); assert_eq!(peer_store.peer_status(&peer_id), Status::Unknown); @@ -63,7 +63,7 @@ fn test_update_status() { #[test] fn test_ban_peer() { - let mut peer_store: Box = Box::new(new_peer_store()); + let peer_store: Box = Box::new(new_peer_store()); let peer_id = PeerId::random(); peer_store.ban_peer(&peer_id, Duration::from_secs(10)); assert!(!peer_store.is_banned(&peer_id)); @@ -75,7 +75,7 @@ fn test_ban_peer() { #[test] fn test_attepmt_ban() { - let mut peer_store: Box = Box::new(new_peer_store()); + let peer_store: Box = Box::new(new_peer_store()); let peer_id = PeerId::random(); let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap(); peer_store.add_connected_peer(&peer_id, addr.clone(), SessionType::Inbound); @@ -87,7 +87,7 @@ fn test_attepmt_ban() { #[test] fn test_bootnodes() { - let mut peer_store: Box = Box::new(new_peer_store()); + let peer_store: Box = Box::new(new_peer_store()); assert!(peer_store.bootnodes(1).is_empty()); let peer_id = PeerId::random(); let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap(); @@ -103,7 +103,7 @@ fn test_bootnodes() { #[test] fn test_peers_to_attempt() { - let mut peer_store: Box = Box::new(new_peer_store()); + let peer_store: Box = Box::new(new_peer_store()); assert!(peer_store.peers_to_attempt(1).is_empty()); let peer_id = PeerId::random(); let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap(); @@ -118,7 +118,7 @@ fn test_peers_to_attempt() { #[test] fn test_peers_to_feeler() { - let mut peer_store: Box = Box::new(new_peer_store()); + let peer_store: Box = Box::new(new_peer_store()); assert!(peer_store.peers_to_feeler(1).is_empty()); let peer_id = PeerId::random(); let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap(); @@ -139,7 +139,7 @@ fn test_peers_to_feeler() { #[test] fn test_random_peers() { - let mut peer_store: Box = Box::new(new_peer_store()); + let peer_store: Box = Box::new(new_peer_store()); assert!(peer_store.random_peers(1).is_empty()); let peer_id = PeerId::random(); let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap(); @@ -157,7 +157,7 @@ fn test_random_peers() { #[test] fn test_delete_peer_info() { - let mut peer_store = new_peer_store(); + let peer_store = new_peer_store(); let addr1 = "/ip4/127.0.0.1".to_multiaddr().unwrap(); let addr2 = "/ip4/192.163.1.1".to_multiaddr().unwrap(); let now = faketime::unix_time(); From d600e922c5e210f0592bee176d5907edb5b631da Mon Sep 17 00:00:00 2001 From: zhangsoledad <787953403@qq.com> Date: Wed, 10 Apr 2019 17:51:17 +0800 Subject: [PATCH 2/3] fix: accept_connection protocol registry --- network/src/peers_registry.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/network/src/peers_registry.rs b/network/src/peers_registry.rs index a3794b0f2a..f24f5742c2 100644 --- a/network/src/peers_registry.rs +++ b/network/src/peers_registry.rs @@ -127,7 +127,8 @@ impl PeersRegistry { ) -> Result { let mut peers = self.peers.write(); - if let Some(peer) = peers.get(&peer_id) { + if let Some(peer) = peers.get_mut(&peer_id) { + peer.protocols.insert(protocol_id, protocol_version); return Ok(RegisterResult::Exist(peer.peer_index)); } From 641f146b4aa6f68bcf26905226decbd26d555ee7 Mon Sep 17 00:00:00 2001 From: zhangsoledad <787953403@qq.com> Date: Wed, 10 Apr 2019 17:52:24 +0800 Subject: [PATCH 3/3] refactor: get rid of `let _ =` pattern --- core/src/script.rs | 2 +- network/src/network.rs | 4 ++- network/src/protocols/discovery.rs | 8 ++++-- network/src/protocols/identify.rs | 19 ++++++++----- rpc/src/module/miner.rs | 7 +++-- rpc/src/module/pool.rs | 7 +++-- rpc/src/module/trace.rs | 7 +++-- sync/src/net_time_checker.rs | 16 ++++++++--- sync/src/relayer/block_proposal_process.rs | 6 +++- sync/src/relayer/compact_block_process.rs | 7 ++++- .../src/relayer/get_block_proposal_process.rs | 6 +++- .../relayer/get_block_transactions_process.rs | 7 +++-- sync/src/relayer/mod.rs | 28 +++++++++++++++---- sync/src/relayer/transaction_process.rs | 8 ++++-- sync/src/synchronizer/get_blocks_process.rs | 12 ++++++-- sync/src/synchronizer/get_headers_process.rs | 12 ++++++-- sync/src/synchronizer/mod.rs | 21 +++++++++++--- 17 files changed, 135 insertions(+), 42 deletions(-) diff --git a/core/src/script.rs b/core/src/script.rs index 79fd464f9b..2daa140b1f 100644 --- a/core/src/script.rs +++ b/core/src/script.rs @@ -25,7 +25,7 @@ fn prefix_hex(bytes: &[u8]) -> String { let mut dst = vec![0u8; bytes.len() * 2 + 2]; dst[0] = b'0'; dst[1] = b'x'; - let _ = hex_encode(bytes, &mut dst[2..]); + hex_encode(bytes, &mut dst[2..]).expect("hex encode buffer checked"); unsafe { String::from_utf8_unchecked(dst) } } diff --git a/network/src/network.rs b/network/src/network.rs index 74990c3223..1b1a6fed10 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -231,7 +231,9 @@ impl NetworkState { // A workaround method for `add_node` rpc call, need to re-write it after new p2p lib integration. pub fn add_node(&self, peer_id: &PeerId, address: Multiaddr) { - let _ = self.peer_store().add_discovered_addr(peer_id, address); + if !self.peer_store().add_discovered_addr(peer_id, address) { + warn!(target: "network", "add_node failed {:?}", peer_id); + } } fn to_external_url(&self, addr: &Multiaddr) -> String { diff --git a/network/src/protocols/discovery.rs b/network/src/protocols/discovery.rs index 5ba78ea339..87b24ec204 100644 --- a/network/src/protocols/discovery.rs +++ b/network/src/protocols/discovery.rs @@ -200,10 +200,14 @@ impl Stream for DiscoveryService { _ => true, }) .collect::(); - let _ = self + + if !self .network_state .peer_store() - .add_discovered_addr(&peer_id, addr); + .add_discovered_addr(&peer_id, addr) + { + warn!(target: "network", "add_discovered_addr failed {:?}", peer_id); + } } } } diff --git a/network/src/protocols/identify.rs b/network/src/protocols/identify.rs index 2519b0731f..0589050bd6 100644 --- a/network/src/protocols/identify.rs +++ b/network/src/protocols/identify.rs @@ -1,9 +1,6 @@ // use crate::peer_store::Behaviour; use crate::NetworkState; -use std::collections::HashMap; -use std::sync::Arc; - -use log::{debug, trace}; +use log::{debug, trace, warn}; use p2p::{ multiaddr::{Multiaddr, Protocol}, secio::PeerId, @@ -11,6 +8,8 @@ use p2p::{ utils::{is_reachable, multiaddr_to_socketaddr}, }; use p2p_identify::{Callback, MisbehaveResult, Misbehavior}; +use std::collections::HashMap; +use std::sync::Arc; const MAX_RETURN_LISTEN_ADDRS: usize = 10; @@ -59,7 +58,9 @@ impl Callback for IdentifyCallback { .insert(peer_id.clone(), addrs.clone()); let peer_store = self.network_state.peer_store(); for addr in addrs { - let _ = peer_store.add_discovered_addr(&peer_id, addr); + if !peer_store.add_discovered_addr(&peer_id, addr) { + warn!(target: "network", "add_discovered_addr failed {:?}", peer_id); + } } } @@ -101,10 +102,14 @@ impl Callback for IdentifyCallback { { debug!(target: "network", "identify add transformed addr: {:?}", transformed_addr); let local_peer_id = self.network_state.local_peer_id(); - let _ = self + + if !self .network_state .peer_store() - .add_discovered_addr(local_peer_id, transformed_addr); + .add_discovered_addr(local_peer_id, transformed_addr) + { + warn!(target: "network", "add_discovered_addr failed {:?}", local_peer_id); + } } // NOTE: for future usage MisbehaveResult::Continue diff --git a/rpc/src/module/miner.rs b/rpc/src/module/miner.rs index b023da5b39..d514773905 100644 --- a/rpc/src/module/miner.rs +++ b/rpc/src/module/miner.rs @@ -11,7 +11,7 @@ use flatbuffers::FlatBufferBuilder; use jsonrpc_core::{Error, Result}; use jsonrpc_derive::rpc; use jsonrpc_types::{Block, BlockTemplate}; -use log::debug; +use log::{debug, warn}; use numext_fixed_hash::H256; use std::collections::HashSet; use std::sync::Arc; @@ -72,7 +72,10 @@ impl MinerRpc for MinerRpcImpl { RelayMessage::build_compact_block(fbb, &block, &HashSet::new()); fbb.finish(message, None); for peer in nc.connected_peers() { - let _ = nc.send(peer, fbb.finished_data().to_vec()); + let ret = nc.send(peer, fbb.finished_data().to_vec()); + if ret.is_err() { + warn!(target: "rpc", "relay block error {:?}", ret); + } } }, ); diff --git a/rpc/src/module/pool.rs b/rpc/src/module/pool.rs index 288837afca..e499ebc3cc 100644 --- a/rpc/src/module/pool.rs +++ b/rpc/src/module/pool.rs @@ -12,7 +12,7 @@ use flatbuffers::FlatBufferBuilder; use jsonrpc_core::Result; use jsonrpc_derive::rpc; use jsonrpc_types::Transaction; -use log::debug; +use log::{debug, warn}; use numext_fixed_hash::H256; #[rpc] @@ -61,7 +61,10 @@ impl PoolRpc for PoolRpcImpl { |mut nc| { for peer in nc.connected_peers() { debug!(target: "rpc", "relay transaction {} to peer#{}", tx_hash, peer); - let _ = nc.send(peer, fbb.finished_data().to_vec()); + let ret = nc.send(peer, fbb.finished_data().to_vec()); + if ret.is_err() { + warn!(target: "rpc", "relay transaction error {:?}", ret); + } } }, ); diff --git a/rpc/src/module/trace.rs b/rpc/src/module/trace.rs index b5f6ef6176..e37f753e27 100644 --- a/rpc/src/module/trace.rs +++ b/rpc/src/module/trace.rs @@ -13,7 +13,7 @@ use flatbuffers::FlatBufferBuilder; use jsonrpc_core::Result; use jsonrpc_derive::rpc; use jsonrpc_types::Transaction; -use log::debug; +use log::{debug, warn}; use numext_fixed_hash::H256; #[rpc] @@ -60,7 +60,10 @@ impl TraceRpc for TraceRpcImpl { |mut nc| { for peer in nc.connected_peers() { debug!(target: "rpc", "relay transaction {} to peer#{}", tx_hash, peer); - let _ = nc.send(peer, fbb.finished_data().to_vec()); + let ret = nc.send(peer, fbb.finished_data().to_vec()); + if ret.is_err() { + warn!(target: "rpc", "relay transaction error {:?}", ret); + } } }, ); diff --git a/sync/src/net_time_checker.rs b/sync/src/net_time_checker.rs index 3cf14965b6..6a909d0bf9 100644 --- a/sync/src/net_time_checker.rs +++ b/sync/src/net_time_checker.rs @@ -94,8 +94,10 @@ impl CKBProtocolHandler for NetTimeProtocol { fn received(&self, nc: Box, peer: PeerIndex, data: Bytes) { if nc.session_info(peer).map(|s| s.peer.is_outbound()) != Some(true) { info!(target: "network", "Peer {} is not outbound but sends us time message", peer); - let _ = nc.report_peer(peer, Behaviour::UnexpectedMessage); - return; + let ret = nc.report_peer(peer, Behaviour::UnexpectedMessage); + if ret.is_err() { + warn!(target: "network", "report_peer peer {:?} UnexpectedMessage error {:?}", peer, ret); + } } let timestamp = match get_root::(&data) @@ -106,7 +108,10 @@ impl CKBProtocolHandler for NetTimeProtocol { Some(timestamp) => timestamp, None => { info!(target: "network", "Peer {} sends us malformed message", peer); - let _ = nc.report_peer(peer, Behaviour::UnexpectedMessage); + let ret = nc.report_peer(peer, Behaviour::UnexpectedMessage); + if ret.is_err() { + warn!(target: "network", "report_peer peer {:?} UnexpectedMessage error {:?}", peer, ret); + } return; } }; @@ -128,7 +133,10 @@ impl CKBProtocolHandler for NetTimeProtocol { let fbb = &mut FlatBufferBuilder::new(); let message = TimeMessage::build_time(fbb, now); fbb.finish(message, None); - let _ = nc.send(peer, fbb.finished_data().to_vec()); + let ret = nc.send(peer, fbb.finished_data().to_vec()); + if ret.is_err() { + warn!(target: "network", "NetTimeProtocol connected init msg send error {:?}", ret); + } } } fn disconnected(&self, _nc: Box, _peer: PeerIndex) {} diff --git a/sync/src/relayer/block_proposal_process.rs b/sync/src/relayer/block_proposal_process.rs index 4f0c294c13..2837687208 100644 --- a/sync/src/relayer/block_proposal_process.rs +++ b/sync/src/relayer/block_proposal_process.rs @@ -4,6 +4,7 @@ use ckb_shared::index::ChainIndex; use ckb_traits::chain_provider::ChainProvider; use ckb_util::TryInto; use failure::Error as FailureError; +use log::warn; pub struct BlockProposalProcess<'a, CI: ChainIndex + 'a> { message: &'a BlockProposal<'a>, @@ -22,10 +23,13 @@ where let chain_state = self.relayer.shared.chain_state().lock(); let txs = FlatbuffersVectorIterator::new(cast!(self.message.transactions())?); for tx in txs { - let _ = chain_state.add_tx_to_pool( + let ret = chain_state.add_tx_to_pool( TryInto::try_into(tx)?, self.relayer.shared.consensus().max_block_cycles(), ); + if ret.is_err() { + warn!(target: "relay", "BlockProposal add_tx_to_pool error {:?}", ret) + } } Ok(()) } diff --git a/sync/src/relayer/compact_block_process.rs b/sync/src/relayer/compact_block_process.rs index 673b1178f5..38283b8521 100644 --- a/sync/src/relayer/compact_block_process.rs +++ b/sync/src/relayer/compact_block_process.rs @@ -10,6 +10,7 @@ use ckb_verification::{HeaderResolverWrapper, HeaderVerifier, Verifier}; use failure::Error as FailureError; use flatbuffers::FlatBufferBuilder; use fnv::FnvHashMap; +use log::warn; use numext_fixed_hash::H256; use std::sync::Arc; @@ -94,7 +95,11 @@ where .collect::>(), ); fbb.finish(message, None); - let _ = self.nc.send(self.peer, fbb.finished_data().to_vec()); + let ret = self.nc.send(self.peer, fbb.finished_data().to_vec()); + + if ret.is_err() { + warn!(target: "relay", "CompactBlockProcess relay error {:?}", ret); + } } Ok(()) } diff --git a/sync/src/relayer/get_block_proposal_process.rs b/sync/src/relayer/get_block_proposal_process.rs index 2903e86d7d..e5d02da5b4 100644 --- a/sync/src/relayer/get_block_proposal_process.rs +++ b/sync/src/relayer/get_block_proposal_process.rs @@ -5,6 +5,7 @@ use ckb_shared::index::ChainIndex; use ckb_util::TryInto; use failure::Error as FailureError; use flatbuffers::FlatBufferBuilder; +use log::warn; pub struct GetBlockProposalProcess<'a, CI: ChainIndex + 'a> { message: &'a GetBlockProposal<'a>, @@ -62,7 +63,10 @@ where let message = RelayMessage::build_block_proposal(fbb, &transactions); fbb.finish(message, None); - let _ = self.nc.send(self.peer, fbb.finished_data().to_vec()); + let ret = self.nc.send(self.peer, fbb.finished_data().to_vec()); + if ret.is_err() { + warn!(target: "relay", "GetBlockProposalProcess response error {:?}", ret); + } Ok(()) } } diff --git a/sync/src/relayer/get_block_transactions_process.rs b/sync/src/relayer/get_block_transactions_process.rs index 125d96ce35..f8e1482f83 100644 --- a/sync/src/relayer/get_block_transactions_process.rs +++ b/sync/src/relayer/get_block_transactions_process.rs @@ -5,7 +5,7 @@ use ckb_shared::index::ChainIndex; use ckb_util::TryInto; use failure::Error as FailureError; use flatbuffers::FlatBufferBuilder; -use log::debug; +use log::{debug, warn}; pub struct GetBlockTransactionsProcess<'a, CI: ChainIndex + 'a> { message: &'a GetBlockTransactions<'a>, @@ -49,7 +49,10 @@ where let message = RelayMessage::build_block_transactions(fbb, &hash, &transactions); fbb.finish(message, None); - let _ = self.nc.send(self.peer, fbb.finished_data().to_vec()); + let ret = self.nc.send(self.peer, fbb.finished_data().to_vec()); + if ret.is_err() { + warn!(target: "relay", "GetBlockTransactionsProcess response error {:?}", ret); + } } Ok(()) diff --git a/sync/src/relayer/mod.rs b/sync/src/relayer/mod.rs index 7797c45231..88dae3673a 100644 --- a/sync/src/relayer/mod.rs +++ b/sync/src/relayer/mod.rs @@ -33,6 +33,7 @@ use ckb_util::Mutex; use failure::Error as FailureError; use flatbuffers::FlatBufferBuilder; use fnv::{FnvHashMap, FnvHashSet}; +use log::warn; use log::{debug, info}; use numext_fixed_hash::H256; use std::collections::HashSet; @@ -128,7 +129,11 @@ where fn process(&self, nc: &mut CKBProtocolContext, peer: PeerIndex, message: RelayMessage) { if self.try_process(nc, peer, message).is_err() { - let _ = nc.report_peer(peer, Behaviour::UnexpectedMessage); + let ret = nc.report_peer(peer, Behaviour::UnexpectedMessage); + + if ret.is_err() { + warn!(target: "network", "report_peer peer {:?} UnexpectedMessage error {:?}", peer, ret); + } } } @@ -158,7 +163,10 @@ where RelayMessage::build_get_block_proposal(fbb, block.header.number(), &unknown_ids); fbb.finish(message, None); - let _ = nc.send(peer, fbb.finished_data().to_vec()); + let ret = nc.send(peer, fbb.finished_data().to_vec()); + if ret.is_err() { + warn!(target: "relay", "relay get_block_proposal error {:?}", ret); + } } pub fn accept_block(&self, nc: &mut CKBProtocolContext, peer: PeerIndex, block: &Arc) { @@ -170,7 +178,10 @@ where for peer_id in nc.connected_peers() { if peer_id != peer { - let _ = nc.send(peer_id, fbb.finished_data().to_vec()); + let ret = nc.send(peer_id, fbb.finished_data().to_vec()); + if ret.is_err() { + warn!(target: "relay", "relay compact_block error {:?}", ret); + } } } } else { @@ -279,7 +290,11 @@ where RelayMessage::build_block_proposal(fbb, &txs.into_iter().collect::>()); fbb.finish(message, None); - let _ = nc.send(peer, fbb.finished_data().to_vec()); + let ret = nc.send(peer, fbb.finished_data().to_vec()); + + if ret.is_err() { + warn!(target: "relay", "send block_proposal error {:?}", ret); + } } } @@ -305,7 +320,10 @@ where Ok(msg) => msg, _ => { info!(target: "sync", "Peer {} sends us a malformed message", peer); - let _ = nc.report_peer(peer, Behaviour::UnexpectedMessage); + let ret = nc.report_peer(peer, Behaviour::UnexpectedMessage); + if ret.is_err() { + warn!(target: "network", "report_peer peer {:?} UnexpectedMessage error {:?}", peer, ret); + } return; } }; diff --git a/sync/src/relayer/transaction_process.rs b/sync/src/relayer/transaction_process.rs index cf9f6ab45d..1903d76fa8 100644 --- a/sync/src/relayer/transaction_process.rs +++ b/sync/src/relayer/transaction_process.rs @@ -9,7 +9,7 @@ use ckb_util::TryInto; use ckb_verification::TransactionError; use failure::Error as FailureError; use flatbuffers::FlatBufferBuilder; -use log::debug; +use log::{debug, warn}; use std::time::Duration; const DEFAULT_BAN_TIME: Duration = Duration::from_secs(3600 * 24 * 3); @@ -64,7 +64,11 @@ where .get(&peer) .map_or(true, |filter| filter.contains(&tx)) { - let _ = self.nc.send(peer, fbb.finished_data().to_vec()); + let ret = self.nc.send(peer, fbb.finished_data().to_vec()); + + if ret.is_err() { + warn!(target: "relay", "relay Transaction error {:?}", ret); + } } } } diff --git a/sync/src/synchronizer/get_blocks_process.rs b/sync/src/synchronizer/get_blocks_process.rs index fc81ec9142..3ec64179cf 100644 --- a/sync/src/synchronizer/get_blocks_process.rs +++ b/sync/src/synchronizer/get_blocks_process.rs @@ -5,7 +5,7 @@ use ckb_shared::index::ChainIndex; use ckb_util::TryInto; use failure::Error as FailureError; use flatbuffers::FlatBufferBuilder; -use log::debug; +use log::{debug, warn}; pub struct GetBlocksProcess<'a, CI: ChainIndex + 'a> { message: &'a GetBlocks<'a>, @@ -59,12 +59,18 @@ where let message = SyncMessage::build_filtered_block(fbb, &block, &transactions_index); fbb.finish(message, None); - let _ = self.nc.send(self.peer, fbb.finished_data().to_vec()); + let ret = self.nc.send(self.peer, fbb.finished_data().to_vec()); + if ret.is_err() { + warn!(target: "relay", "response GetBlocks error {:?}", ret); + } } else { let fbb = &mut FlatBufferBuilder::new(); let message = SyncMessage::build_block(fbb, &block); fbb.finish(message, None); - let _ = self.nc.send(self.peer, fbb.finished_data().to_vec()); + let ret = self.nc.send(self.peer, fbb.finished_data().to_vec()); + if ret.is_err() { + warn!(target: "relay", "response GetBlocks error {:?}", ret); + } } } else { // TODO response not found diff --git a/sync/src/synchronizer/get_headers_process.rs b/sync/src/synchronizer/get_headers_process.rs index 4f9c1d869f..1ce91b5265 100644 --- a/sync/src/synchronizer/get_headers_process.rs +++ b/sync/src/synchronizer/get_headers_process.rs @@ -71,12 +71,20 @@ where let fbb = &mut FlatBufferBuilder::new(); let message = SyncMessage::build_headers(fbb, &headers); fbb.finish(message, None); - let _ = self.nc.send(self.peer, fbb.finished_data().to_vec()); + let ret = self.nc.send(self.peer, fbb.finished_data().to_vec()); + + if ret.is_err() { + warn!(target: "sync", "response GetHeaders error {:?}", ret); + } } else { warn!(target: "sync", "\n\nunknown block headers from peer {} {:#?}\n\n", self.peer, block_locator_hashes); // Got 'headers' message without known blocks // ban or close peers - let _ = self.nc.report_peer(self.peer, Behaviour::SyncUseless); + let report_ret = self.nc.report_peer(self.peer, Behaviour::SyncUseless); + + if report_ret.is_err() { + warn!(target: "sync", "report behaviour SyncUseless error {:?}", report_ret); + } // disconnect peer anyway self.nc.disconnect(self.peer); } diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index 9fdff61699..8f31860a28 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -182,7 +182,10 @@ impl Synchronizer { fn process(&self, nc: &mut CKBProtocolContext, peer: PeerIndex, message: SyncMessage) { if self.try_process(nc, peer, message).is_err() { - let _ = nc.report_peer(peer, Behaviour::UnexpectedMessage); + let ret = nc.report_peer(peer, Behaviour::UnexpectedMessage); + if ret.is_err() { + warn!(target: "network", "report_peer peer {:?} UnexpectedMessage error {:?}", peer, ret); + } } } @@ -537,7 +540,11 @@ impl Synchronizer { let fbb = &mut FlatBufferBuilder::new(); let message = SyncMessage::build_get_headers(fbb, &locator_hash); fbb.finish(message, None); - let _ = nc.send(peer, fbb.finished_data().to_vec()); + let ret = nc.send(peer, fbb.finished_data().to_vec()); + + if ret.is_err() { + warn!(target: "sync", "send_getheaders_to_peer error {:?}", ret); + } } // - If at timeout their best known block now has more work than our tip @@ -699,7 +706,10 @@ impl Synchronizer { let fbb = &mut FlatBufferBuilder::new(); let message = SyncMessage::build_get_blocks(fbb, v_fetch); fbb.finish(message, None); - let _ = nc.send(peer, fbb.finished_data().to_vec()); + let ret = nc.send(peer, fbb.finished_data().to_vec()); + if ret.is_err() { + warn!(target: "sync", "send_getblocks error {:?}", ret); + } debug!(target: "sync", "send_getblocks len={:?} to peer={}", v_fetch.len() , peer); } } @@ -720,7 +730,10 @@ where Ok(msg) => msg, _ => { info!(target: "sync", "Peer {} sends us a malformed message", peer); - let _ = nc.report_peer(peer, Behaviour::UnexpectedMessage); + let ret = nc.report_peer(peer, Behaviour::UnexpectedMessage); + if ret.is_err() { + warn!(target: "sync", "report_peer peer {:?} UnexpectedMessage error {:?}", peer, ret) + } return; } };