Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
392: refactor: avoid recursive lock r=doitian a=zhangsoledad



Co-authored-by: zhangsoledad <787953403@qq.com>
  • Loading branch information
bors[bot] and zhangsoledad committed Apr 10, 2019
2 parents f5a17c1 + 641f146 commit 29c0a4d
Show file tree
Hide file tree
Showing 25 changed files with 504 additions and 407 deletions.
2 changes: 1 addition & 1 deletion core/src/script.rs
Expand Up @@ -24,7 +24,7 @@ fn prefix_hex(bytes: &[u8]) -> String {
let mut dst = vec![0u8; bytes.len() * 2 + 2];
dst[0] = b'0';
dst[1] = b'x';
let _ = hex_encode(bytes, &mut dst[2..]);
hex_encode(bytes, &mut dst[2..]).expect("hex encode buffer checked");
unsafe { String::from_utf8_unchecked(dst) }
}

Expand Down
31 changes: 10 additions & 21 deletions network/src/benches/sqlite_peer_store.rs
Expand Up @@ -8,16 +8,14 @@ use ckb_network::{
peer_store::{PeerStore, SqlitePeerStore},
PeerId, SessionType,
};
use ckb_util::Mutex;
use criterion::Criterion;
use std::rc::Rc;

fn insert_peer_info_benchmark(c: &mut Criterion) {
c.bench_function("insert 100 peer_info", |b| {
b.iter({
let mut peer_store =
SqlitePeerStore::memory("bench_db_insert_100_peer_info".to_string())
.expect("memory");
let peer_store = SqlitePeerStore::memory("bench_db_insert_100_peer_info".to_string())
.expect("memory");
let peer_ids = (0..100).map(|_| PeerId::random()).collect::<Vec<_>>();
let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap();
move || {
Expand All @@ -29,9 +27,8 @@ fn insert_peer_info_benchmark(c: &mut Criterion) {
});
c.bench_function("insert 1000 peer_info", |b| {
b.iter({
let mut peer_store =
SqlitePeerStore::memory("bench_db_insert_1000_peer_info".to_string())
.expect("memory");
let peer_store = SqlitePeerStore::memory("bench_db_insert_1000_peer_info".to_string())
.expect("memory");
let peer_ids = (0..1000).map(|_| PeerId::random()).collect::<Vec<_>>();
let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap();
move || {
Expand All @@ -45,7 +42,7 @@ fn insert_peer_info_benchmark(c: &mut Criterion) {
// filesystem benchmark
c.bench_function("insert 100 peer_info on filesystem", move |b| {
b.iter({
let mut peer_store = SqlitePeerStore::temp().expect("temp");
let peer_store = SqlitePeerStore::temp().expect("temp");
let peer_ids = (0..100).map(|_| PeerId::random()).collect::<Vec<_>>();
let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap();
move || {
Expand All @@ -59,12 +56,10 @@ fn insert_peer_info_benchmark(c: &mut Criterion) {

fn random_order_benchmark(c: &mut Criterion) {
{
let peer_store = Rc::new(Mutex::new(
SqlitePeerStore::memory("bench_db_random_order".to_string()).expect("memory"),
));
let peer_store =
Rc::new(SqlitePeerStore::memory("bench_db_random_order".to_string()).expect("memory"));
let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap();
{
let mut peer_store = peer_store.lock();
for _ in 0..8000 {
let peer_id = PeerId::random();
peer_store.add_connected_peer(&peer_id, addr.clone(), SessionType::Outbound);
Expand All @@ -79,10 +74,7 @@ fn random_order_benchmark(c: &mut Criterion) {
move || {
let peer_store = Rc::clone(&peer_store);
let count = 1000;
assert_eq!(
peer_store.lock().peers_to_attempt(count).len() as u32,
count
);
assert_eq!(peer_store.peers_to_attempt(count).len() as u32, count);
}
})
}
Expand All @@ -95,10 +87,7 @@ fn random_order_benchmark(c: &mut Criterion) {
move || {
let peer_store = Rc::clone(&peer_store);
let count = 2000;
assert_eq!(
peer_store.lock().peers_to_attempt(count).len() as u32,
count
);
assert_eq!(peer_store.peers_to_attempt(count).len() as u32, count);
}
})
}
Expand All @@ -110,7 +99,7 @@ fn random_order_benchmark(c: &mut Criterion) {
"random order 1000 / 8000 peer_info on filesystem",
move |b| {
b.iter({
let mut peer_store = SqlitePeerStore::temp().expect("temp");
let peer_store = SqlitePeerStore::temp().expect("temp");
let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap();
for _ in 0..8000 {
let peer_id = PeerId::random();
Expand Down
138 changes: 59 additions & 79 deletions network/src/network.rs
@@ -1,4 +1,4 @@
use crate::errors::{Error, ProtocolError};
use crate::errors::Error;
use crate::peer_store::{sqlite::SqlitePeerStore, PeerStore, Status};
use crate::peers_registry::{ConnectionStatus, PeersRegistry, RegisterResult};
use crate::protocols::{
Expand Down Expand Up @@ -60,8 +60,8 @@ pub struct SessionInfo {

pub struct NetworkState {
protocol_ids: RwLock<FnvHashSet<ProtocolId>>,
pub(crate) peers_registry: RwLock<PeersRegistry>,
peer_store: Arc<RwLock<dyn PeerStore>>,
pub(crate) peers_registry: PeersRegistry,
peer_store: Arc<dyn PeerStore>,
listened_addresses: RwLock<FnvHashMap<Multiaddr, u8>>,
pub(crate) original_listened_addresses: RwLock<Vec<Multiaddr>>,
// For avoid repeat failed dial
Expand All @@ -82,14 +82,14 @@ impl NetworkState {
.chain(config.public_addresses.iter())
.map(|addr| (addr.to_owned(), std::u8::MAX))
.collect();
let peer_store: Arc<RwLock<dyn PeerStore>> = {
let mut peer_store =
let peer_store: Arc<dyn PeerStore> = {
let peer_store =
SqlitePeerStore::file(config.peer_store_path().to_string_lossy().to_string())?;
let bootnodes = config.bootnodes()?;
for (peer_id, addr) in bootnodes {
peer_store.add_bootnode(peer_id, addr);
}
Arc::new(RwLock::new(peer_store))
Arc::new(peer_store)
};

let reserved_peers = config
Expand All @@ -108,8 +108,8 @@ impl NetworkState {
Ok(NetworkState {
peer_store,
config,
peers_registry,
failed_dials: RwLock::new(LruCache::new(FAILED_DIAL_CACHE_SIZE)),
peers_registry: RwLock::new(peers_registry),
listened_addresses: RwLock::new(listened_addresses),
original_listened_addresses: RwLock::new(Vec::new()),
local_private_key: local_private_key.clone(),
Expand All @@ -120,12 +120,12 @@ impl NetworkState {

pub fn report(&self, peer_id: &PeerId, behaviour: Behaviour) {
info!(target: "network", "report {:?} because {:?}", peer_id, behaviour);
self.peer_store.write().report(peer_id, behaviour);
self.peer_store.report(peer_id, behaviour);
}

pub fn drop_peer(&self, p2p_control: &mut ServiceControl, peer_id: &PeerId) {
debug!(target: "network", "drop peer {:?}", peer_id);
if let Some(peer) = self.peers_registry.write().drop_peer(&peer_id) {
if let Some(peer) = self.peers_registry.drop_peer(&peer_id) {
if let Err(err) = p2p_control.disconnect(peer.session_id) {
error!(target: "network", "disconnect peer error {:?}", err);
}
Expand All @@ -135,16 +135,17 @@ impl NetworkState {
pub fn drop_all(&self, p2p_control: &mut ServiceControl) {
debug!(target: "network", "drop all connections...");
let mut peer_ids = Vec::new();
let mut peers_registry = self.peers_registry.write();
for (peer_id, peer) in peers_registry.peers_iter() {
peer_ids.push(peer_id.clone());
if let Err(err) = p2p_control.disconnect(peer.session_id) {
error!(target: "network", "disconnect peer error {:?}", err);
{
for (peer_id, peer) in self.peers_registry.peers_guard().read().iter() {
peer_ids.push(peer_id.clone());
if let Err(err) = p2p_control.disconnect(peer.session_id) {
error!(target: "network", "disconnect peer error {:?}", err);
}
}
}
peers_registry.drop_all();
self.peers_registry.drop_all();

let mut peer_store = self.peer_store().write();
let peer_store = self.peer_store();
for peer_id in peer_ids {
if peer_store.peer_status(&peer_id) != Status::Disconnected {
peer_store.report(&peer_id, Behaviour::UnexpectedDisconnect);
Expand All @@ -167,36 +168,30 @@ impl NetworkState {
}

pub(crate) fn get_peer_index(&self, peer_id: &PeerId) -> Option<PeerIndex> {
let peers_registry = self.peers_registry.read();
peers_registry.get(&peer_id).map(|peer| peer.peer_index)
self.peers_registry
.peers_guard()
.read()
.get(&peer_id)
.map(|peer| peer.peer_index)
}

pub(crate) fn get_peer_id(&self, peer_index: PeerIndex) -> Option<PeerId> {
let peers_registry = self.peers_registry.read();
peers_registry
.get_peer_id(peer_index)
.map(|peer_id| peer_id.to_owned())
self.peers_registry.get_peer_id(peer_index)
}

pub(crate) fn connection_status(&self) -> ConnectionStatus {
let peers_registry = self.peers_registry.read();
peers_registry.connection_status()
self.peers_registry.connection_status()
}

pub(crate) fn modify_peer<F>(&self, peer_id: &PeerId, f: F)
where
F: FnOnce(&mut Peer) -> (),
{
let mut peers_registry = self.peers_registry.write();
if let Some(peer) = peers_registry.get_mut(peer_id) {
f(peer);
}
self.peers_registry.modify_peer(peer_id, f);
}

pub(crate) fn peers_indexes(&self) -> Vec<PeerIndex> {
let peers_registry = self.peers_registry.read();
let iter = peers_registry.connected_peers_indexes();
iter.collect::<Vec<_>>()
self.peers_registry.connected_peers_indexes()
}

pub(crate) fn ban_peer(
Expand All @@ -206,10 +201,10 @@ impl NetworkState {
timeout: Duration,
) {
self.drop_peer(p2p_control, peer_id);
self.peer_store.write().ban_peer(peer_id, timeout);
self.peer_store.ban_peer(peer_id, timeout);
}

pub(crate) fn peer_store(&self) -> &RwLock<dyn PeerStore> {
pub(crate) fn peer_store(&self) -> &Arc<dyn PeerStore> {
&self.peer_store
}

Expand All @@ -236,10 +231,9 @@ impl NetworkState {

// A workaround method for `add_node` rpc call, need to re-write it after new p2p lib integration.
pub fn add_node(&self, peer_id: &PeerId, address: Multiaddr) {
let _ = self
.peer_store()
.write()
.add_discovered_addr(peer_id, address);
if !self.peer_store().add_discovered_addr(peer_id, address) {
warn!(target: "network", "add_node failed {:?}", peer_id);
}
}

fn to_external_url(&self, addr: &Multiaddr) -> String {
Expand All @@ -255,54 +249,40 @@ impl NetworkState {
protocol_id: ProtocolId,
protocol_version: ProtocolVersion,
) -> Result<RegisterResult, Error> {
let mut peers_registry = self.peers_registry.write();
let register_result = if session_type.is_outbound() {
peers_registry.try_outbound_peer(
peer_id.clone(),
connected_addr,
session_id,
session_type,
)
} else {
peers_registry.accept_inbound_peer(
peer_id.clone(),
connected_addr,
session_id,
session_type,
)
}?;
// add session to peer
match peers_registry.get_mut(&peer_id) {
Some(peer) => match peer.protocol_version(protocol_id) {
Some(_) => return Err(ProtocolError::Duplicate(protocol_id).into()),
None => {
peer.protocols.insert(protocol_id, protocol_version);
}
},
None => unreachable!("get peer after inserted"),
}
Ok(register_result)
self.peers_registry.accept_connection(
peer_id,
connected_addr,
session_id,
session_type,
protocol_id,
protocol_version,
)
}

pub fn peer_protocol_version(
&self,
peer_id: &PeerId,
protocol_id: ProtocolId,
) -> Option<ProtocolVersion> {
let peers_registry = self.peers_registry.read();
peers_registry
self.peers_registry
.peers_guard()
.read()
.get(peer_id)
.and_then(|peer| peer.protocol_version(protocol_id))
}

pub fn session_info(&self, peer_id: &PeerId, protocol_id: ProtocolId) -> Option<SessionInfo> {
let peers_registry = self.peers_registry.read();
peers_registry.get(peer_id).map(|peer| {
let protocol_version = peer.protocol_version(protocol_id);
SessionInfo {
peer: peer.clone(),
protocol_version,
}
})
self.peers_registry
.peers_guard()
.read()
.get(peer_id)
.map(|peer| {
let protocol_version = peer.protocol_version(protocol_id);
SessionInfo {
peer: peer.clone(),
protocol_version,
}
})
}

pub fn get_protocol_ids<F: Fn(ProtocolId) -> bool>(&self, filter: F) -> Vec<ProtocolId> {
Expand Down Expand Up @@ -398,7 +378,7 @@ impl ServiceHandle for EventHandler {
.map(|pubkey| pubkey.peer_id())
.expect("Secio must enabled");

let mut peer_store = self.network_state.peer_store().write();
let peer_store = self.network_state.peer_store();
if peer_store.peer_status(&peer_id) == Status::Connected {
peer_store.report(&peer_id, Behaviour::UnexpectedDisconnect);
peer_store.update_status(&peer_id, Status::Disconnected);
Expand Down Expand Up @@ -432,7 +412,7 @@ impl ServiceHandle for EventHandler {
Ok(register_result) => {
// update status in peer_store
if let RegisterResult::New(_) = register_result {
let mut peer_store = self.network_state.peer_store().write();
let peer_store = self.network_state.peer_store();
peer_store.update_status(&peer_id, Status::Connected);
}
}
Expand Down Expand Up @@ -596,7 +576,6 @@ impl NetworkService {
let bootnodes = self
.network_state
.peer_store()
.read()
.bootnodes(max((config.max_outbound_peers / 2) as u32, 1))
.clone();
// dial half bootnodes
Expand Down Expand Up @@ -681,12 +660,13 @@ impl NetworkController {
}

pub fn connected_peers(&self) -> Vec<(PeerId, Peer, MultiaddrList)> {
let peer_store = self.network_state.peer_store().read();
let peer_store = self.network_state.peer_store();

self.network_state
.peers_registry
.peers_guard()
.read()
.peers_iter()
.iter()
.map(|(peer_id, peer)| {
(
peer_id.clone(),
Expand Down

0 comments on commit 29c0a4d

Please sign in to comment.