Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: avoid recursive lock #392

Merged
merged 3 commits into from Apr 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/script.rs
Expand Up @@ -25,7 +25,7 @@ fn prefix_hex(bytes: &[u8]) -> String {
let mut dst = vec![0u8; bytes.len() * 2 + 2];
dst[0] = b'0';
dst[1] = b'x';
let _ = hex_encode(bytes, &mut dst[2..]);
hex_encode(bytes, &mut dst[2..]).expect("hex encode buffer checked");
unsafe { String::from_utf8_unchecked(dst) }
}

Expand Down
31 changes: 10 additions & 21 deletions network/src/benches/sqlite_peer_store.rs
Expand Up @@ -8,16 +8,14 @@ use ckb_network::{
peer_store::{PeerStore, SqlitePeerStore},
PeerId, SessionType,
};
use ckb_util::Mutex;
use criterion::Criterion;
use std::rc::Rc;

fn insert_peer_info_benchmark(c: &mut Criterion) {
c.bench_function("insert 100 peer_info", |b| {
b.iter({
let mut peer_store =
SqlitePeerStore::memory("bench_db_insert_100_peer_info".to_string())
.expect("memory");
let peer_store = SqlitePeerStore::memory("bench_db_insert_100_peer_info".to_string())
.expect("memory");
let peer_ids = (0..100).map(|_| PeerId::random()).collect::<Vec<_>>();
let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap();
move || {
Expand All @@ -29,9 +27,8 @@ fn insert_peer_info_benchmark(c: &mut Criterion) {
});
c.bench_function("insert 1000 peer_info", |b| {
b.iter({
let mut peer_store =
SqlitePeerStore::memory("bench_db_insert_1000_peer_info".to_string())
.expect("memory");
let peer_store = SqlitePeerStore::memory("bench_db_insert_1000_peer_info".to_string())
.expect("memory");
let peer_ids = (0..1000).map(|_| PeerId::random()).collect::<Vec<_>>();
let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap();
move || {
Expand All @@ -45,7 +42,7 @@ fn insert_peer_info_benchmark(c: &mut Criterion) {
// filesystem benchmark
c.bench_function("insert 100 peer_info on filesystem", move |b| {
b.iter({
let mut peer_store = SqlitePeerStore::temp().expect("temp");
let peer_store = SqlitePeerStore::temp().expect("temp");
let peer_ids = (0..100).map(|_| PeerId::random()).collect::<Vec<_>>();
let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap();
move || {
Expand All @@ -59,12 +56,10 @@ fn insert_peer_info_benchmark(c: &mut Criterion) {

fn random_order_benchmark(c: &mut Criterion) {
{
let peer_store = Rc::new(Mutex::new(
SqlitePeerStore::memory("bench_db_random_order".to_string()).expect("memory"),
));
let peer_store =
Rc::new(SqlitePeerStore::memory("bench_db_random_order".to_string()).expect("memory"));
let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap();
{
let mut peer_store = peer_store.lock();
for _ in 0..8000 {
let peer_id = PeerId::random();
peer_store.add_connected_peer(&peer_id, addr.clone(), SessionType::Outbound);
Expand All @@ -79,10 +74,7 @@ fn random_order_benchmark(c: &mut Criterion) {
move || {
let peer_store = Rc::clone(&peer_store);
let count = 1000;
assert_eq!(
peer_store.lock().peers_to_attempt(count).len() as u32,
count
);
assert_eq!(peer_store.peers_to_attempt(count).len() as u32, count);
}
})
}
Expand All @@ -95,10 +87,7 @@ fn random_order_benchmark(c: &mut Criterion) {
move || {
let peer_store = Rc::clone(&peer_store);
let count = 2000;
assert_eq!(
peer_store.lock().peers_to_attempt(count).len() as u32,
count
);
assert_eq!(peer_store.peers_to_attempt(count).len() as u32, count);
}
})
}
Expand All @@ -110,7 +99,7 @@ fn random_order_benchmark(c: &mut Criterion) {
"random order 1000 / 8000 peer_info on filesystem",
move |b| {
b.iter({
let mut peer_store = SqlitePeerStore::temp().expect("temp");
let peer_store = SqlitePeerStore::temp().expect("temp");
let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap();
for _ in 0..8000 {
let peer_id = PeerId::random();
Expand Down
138 changes: 59 additions & 79 deletions network/src/network.rs
@@ -1,4 +1,4 @@
use crate::errors::{Error, ProtocolError};
use crate::errors::Error;
use crate::peer_store::{sqlite::SqlitePeerStore, PeerStore, Status};
use crate::peers_registry::{ConnectionStatus, PeersRegistry, RegisterResult};
use crate::protocols::{
Expand Down Expand Up @@ -60,8 +60,8 @@ pub struct SessionInfo {

pub struct NetworkState {
protocol_ids: RwLock<FnvHashSet<ProtocolId>>,
pub(crate) peers_registry: RwLock<PeersRegistry>,
peer_store: Arc<RwLock<dyn PeerStore>>,
pub(crate) peers_registry: PeersRegistry,
peer_store: Arc<dyn PeerStore>,
listened_addresses: RwLock<FnvHashMap<Multiaddr, u8>>,
pub(crate) original_listened_addresses: RwLock<Vec<Multiaddr>>,
// For avoid repeat failed dial
Expand All @@ -82,14 +82,14 @@ impl NetworkState {
.chain(config.public_addresses.iter())
.map(|addr| (addr.to_owned(), std::u8::MAX))
.collect();
let peer_store: Arc<RwLock<dyn PeerStore>> = {
let mut peer_store =
let peer_store: Arc<dyn PeerStore> = {
let peer_store =
SqlitePeerStore::file(config.peer_store_path().to_string_lossy().to_string())?;
let bootnodes = config.bootnodes()?;
for (peer_id, addr) in bootnodes {
peer_store.add_bootnode(peer_id, addr);
}
Arc::new(RwLock::new(peer_store))
Arc::new(peer_store)
};

let reserved_peers = config
Expand All @@ -108,8 +108,8 @@ impl NetworkState {
Ok(NetworkState {
peer_store,
config,
peers_registry,
failed_dials: RwLock::new(LruCache::new(FAILED_DIAL_CACHE_SIZE)),
peers_registry: RwLock::new(peers_registry),
listened_addresses: RwLock::new(listened_addresses),
original_listened_addresses: RwLock::new(Vec::new()),
local_private_key: local_private_key.clone(),
Expand All @@ -120,12 +120,12 @@ impl NetworkState {

pub fn report(&self, peer_id: &PeerId, behaviour: Behaviour) {
info!(target: "network", "report {:?} because {:?}", peer_id, behaviour);
self.peer_store.write().report(peer_id, behaviour);
self.peer_store.report(peer_id, behaviour);
}

pub fn drop_peer(&self, p2p_control: &mut ServiceControl, peer_id: &PeerId) {
debug!(target: "network", "drop peer {:?}", peer_id);
if let Some(peer) = self.peers_registry.write().drop_peer(&peer_id) {
if let Some(peer) = self.peers_registry.drop_peer(&peer_id) {
if let Err(err) = p2p_control.disconnect(peer.session_id) {
error!(target: "network", "disconnect peer error {:?}", err);
}
Expand All @@ -135,16 +135,17 @@ impl NetworkState {
pub fn drop_all(&self, p2p_control: &mut ServiceControl) {
debug!(target: "network", "drop all connections...");
let mut peer_ids = Vec::new();
let mut peers_registry = self.peers_registry.write();
for (peer_id, peer) in peers_registry.peers_iter() {
peer_ids.push(peer_id.clone());
if let Err(err) = p2p_control.disconnect(peer.session_id) {
error!(target: "network", "disconnect peer error {:?}", err);
{
for (peer_id, peer) in self.peers_registry.peers_guard().read().iter() {
peer_ids.push(peer_id.clone());
if let Err(err) = p2p_control.disconnect(peer.session_id) {
error!(target: "network", "disconnect peer error {:?}", err);
}
}
}
peers_registry.drop_all();
self.peers_registry.drop_all();

let mut peer_store = self.peer_store().write();
let peer_store = self.peer_store();
for peer_id in peer_ids {
if peer_store.peer_status(&peer_id) != Status::Disconnected {
peer_store.report(&peer_id, Behaviour::UnexpectedDisconnect);
Expand All @@ -167,36 +168,30 @@ impl NetworkState {
}

pub(crate) fn get_peer_index(&self, peer_id: &PeerId) -> Option<PeerIndex> {
let peers_registry = self.peers_registry.read();
peers_registry.get(&peer_id).map(|peer| peer.peer_index)
self.peers_registry
.peers_guard()
.read()
.get(&peer_id)
.map(|peer| peer.peer_index)
}

pub(crate) fn get_peer_id(&self, peer_index: PeerIndex) -> Option<PeerId> {
let peers_registry = self.peers_registry.read();
peers_registry
.get_peer_id(peer_index)
.map(|peer_id| peer_id.to_owned())
self.peers_registry.get_peer_id(peer_index)
}

pub(crate) fn connection_status(&self) -> ConnectionStatus {
let peers_registry = self.peers_registry.read();
peers_registry.connection_status()
self.peers_registry.connection_status()
}

pub(crate) fn modify_peer<F>(&self, peer_id: &PeerId, f: F)
where
F: FnOnce(&mut Peer) -> (),
{
let mut peers_registry = self.peers_registry.write();
if let Some(peer) = peers_registry.get_mut(peer_id) {
f(peer);
}
self.peers_registry.modify_peer(peer_id, f);
}

pub(crate) fn peers_indexes(&self) -> Vec<PeerIndex> {
let peers_registry = self.peers_registry.read();
let iter = peers_registry.connected_peers_indexes();
iter.collect::<Vec<_>>()
self.peers_registry.connected_peers_indexes()
}

pub(crate) fn ban_peer(
Expand All @@ -206,10 +201,10 @@ impl NetworkState {
timeout: Duration,
) {
self.drop_peer(p2p_control, peer_id);
self.peer_store.write().ban_peer(peer_id, timeout);
self.peer_store.ban_peer(peer_id, timeout);
}

pub(crate) fn peer_store(&self) -> &RwLock<dyn PeerStore> {
pub(crate) fn peer_store(&self) -> &Arc<dyn PeerStore> {
&self.peer_store
}

Expand All @@ -236,10 +231,9 @@ impl NetworkState {

// A workaround method for `add_node` rpc call, need to re-write it after new p2p lib integration.
pub fn add_node(&self, peer_id: &PeerId, address: Multiaddr) {
let _ = self
.peer_store()
.write()
.add_discovered_addr(peer_id, address);
if !self.peer_store().add_discovered_addr(peer_id, address) {
warn!(target: "network", "add_node failed {:?}", peer_id);
}
}

fn to_external_url(&self, addr: &Multiaddr) -> String {
Expand All @@ -255,54 +249,40 @@ impl NetworkState {
protocol_id: ProtocolId,
protocol_version: ProtocolVersion,
) -> Result<RegisterResult, Error> {
let mut peers_registry = self.peers_registry.write();
let register_result = if session_type.is_outbound() {
peers_registry.try_outbound_peer(
peer_id.clone(),
connected_addr,
session_id,
session_type,
)
} else {
peers_registry.accept_inbound_peer(
peer_id.clone(),
connected_addr,
session_id,
session_type,
)
}?;
// add session to peer
match peers_registry.get_mut(&peer_id) {
Some(peer) => match peer.protocol_version(protocol_id) {
Some(_) => return Err(ProtocolError::Duplicate(protocol_id).into()),
None => {
peer.protocols.insert(protocol_id, protocol_version);
}
},
None => unreachable!("get peer after inserted"),
}
Ok(register_result)
self.peers_registry.accept_connection(
peer_id,
connected_addr,
session_id,
session_type,
protocol_id,
protocol_version,
)
}

pub fn peer_protocol_version(
&self,
peer_id: &PeerId,
protocol_id: ProtocolId,
) -> Option<ProtocolVersion> {
let peers_registry = self.peers_registry.read();
peers_registry
self.peers_registry
.peers_guard()
.read()
.get(peer_id)
.and_then(|peer| peer.protocol_version(protocol_id))
}

pub fn session_info(&self, peer_id: &PeerId, protocol_id: ProtocolId) -> Option<SessionInfo> {
let peers_registry = self.peers_registry.read();
peers_registry.get(peer_id).map(|peer| {
let protocol_version = peer.protocol_version(protocol_id);
SessionInfo {
peer: peer.clone(),
protocol_version,
}
})
self.peers_registry
.peers_guard()
.read()
.get(peer_id)
.map(|peer| {
let protocol_version = peer.protocol_version(protocol_id);
SessionInfo {
peer: peer.clone(),
protocol_version,
}
})
}

pub fn get_protocol_ids<F: Fn(ProtocolId) -> bool>(&self, filter: F) -> Vec<ProtocolId> {
Expand Down Expand Up @@ -398,7 +378,7 @@ impl ServiceHandle for EventHandler {
.map(|pubkey| pubkey.peer_id())
.expect("Secio must enabled");

let mut peer_store = self.network_state.peer_store().write();
let peer_store = self.network_state.peer_store();
if peer_store.peer_status(&peer_id) == Status::Connected {
peer_store.report(&peer_id, Behaviour::UnexpectedDisconnect);
peer_store.update_status(&peer_id, Status::Disconnected);
Expand Down Expand Up @@ -432,7 +412,7 @@ impl ServiceHandle for EventHandler {
Ok(register_result) => {
// update status in peer_store
if let RegisterResult::New(_) = register_result {
let mut peer_store = self.network_state.peer_store().write();
let peer_store = self.network_state.peer_store();
peer_store.update_status(&peer_id, Status::Connected);
}
}
Expand Down Expand Up @@ -596,7 +576,6 @@ impl NetworkService {
let bootnodes = self
.network_state
.peer_store()
.read()
.bootnodes(max((config.max_outbound_peers / 2) as u32, 1))
.clone();
// dial half bootnodes
Expand Down Expand Up @@ -681,12 +660,13 @@ impl NetworkController {
}

pub fn connected_peers(&self) -> Vec<(PeerId, Peer, MultiaddrList)> {
let peer_store = self.network_state.peer_store().read();
let peer_store = self.network_state.peer_store();

self.network_state
.peers_registry
.peers_guard()
.read()
.peers_iter()
.iter()
.map(|(peer_id, peer)| {
(
peer_id.clone(),
Expand Down