Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Deduplicating crate dependencies (part 2 of n, slab) (#11613)
Browse files Browse the repository at this point in the history
The change includes only `slab` module.
  • Loading branch information
shergin committed Apr 7, 2020
1 parent c92a15d commit 12cbe93
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 37 deletions.
8 changes: 1 addition & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion util/io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mio = { version = "0.6.19", optional = true }
crossbeam-deque = "0.7.3"
parking_lot = "0.10.0"
log = "0.4"
slab = "0.4"
slab = "0.4.2"
num_cpus = "1.8"
timer = "0.2"
time = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion util/network-devp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ rlp = "0.4.0"
secp256k1 = "0.17"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
slab = "0.2"
slab = "0.4.2"
tiny-keccak = "1.4"

[dev-dependencies]
Expand Down
52 changes: 24 additions & 28 deletions util/network-devp2p/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::time::Duration;
use slab::Slab;

use ethereum_types::H256;
use keccak_hash::keccak;
Expand Down Expand Up @@ -55,8 +56,6 @@ use crate::{
session::{Session, SessionData}
};

type Slab<T> = ::slab::Slab<T, usize>;

const MAX_SESSIONS: usize = 2048 + MAX_HANDSHAKES;
const MAX_HANDSHAKES: usize = 1024;

Expand Down Expand Up @@ -153,7 +152,7 @@ impl<'s> NetworkContextTrait for NetworkContext<'s> {
let session = self.resolve_session(peer);
if let Some(session) = session {
session.lock().send_packet(self.io, Some(protocol), packet_id as u8, &data)?;
} else {
} else {
trace!(target: "network", "Send: Peer no longer exist")
}
Ok(())
Expand Down Expand Up @@ -339,7 +338,7 @@ impl Host {
discovery: Mutex::new(None),
udp_socket: Mutex::new(None),
tcp_listener: Mutex::new(tcp_listener),
sessions: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_SESSION, MAX_SESSIONS))),
sessions: Arc::new(RwLock::new(Slab::with_capacity(MAX_SESSIONS))),
nodes: RwLock::new(NodeTable::new(path)),
handlers: RwLock::new(HashMap::new()),
timers: RwLock::new(HashMap::new()),
Expand Down Expand Up @@ -399,7 +398,7 @@ impl Host {
// disconnect all non-reserved peers here.
let reserved: HashSet<NodeId> = self.reserved_nodes.read().clone();
let mut to_kill = Vec::new();
for e in self.sessions.read().iter() {
for (_, e) in self.sessions.read().iter() {
let mut s = e.lock();
{
let id = s.id();
Expand Down Expand Up @@ -439,7 +438,7 @@ impl Host {
pub fn stop(&self, io: &IoContext<NetworkIoMessage>) {
self.stopping.store(true, AtomicOrdering::Release);
let mut to_kill = Vec::new();
for e in self.sessions.read().iter() {
for (_, e) in self.sessions.read().iter() {
let mut s = e.lock();
s.disconnect(io, DisconnectReason::ClientQuit);
to_kill.push(s.token());
Expand All @@ -456,7 +455,7 @@ impl Host {
let sessions = self.sessions.read();
let sessions = &*sessions;

let mut peers = Vec::with_capacity(sessions.count());
let mut peers = Vec::with_capacity(sessions.len());
for i in (0..MAX_SESSIONS).map(|x| x + FIRST_SESSION) {
if sessions.get(i).is_some() {
peers.push(i);
Expand Down Expand Up @@ -533,15 +532,15 @@ impl Host {
}

fn have_session(&self, id: &NodeId) -> bool {
self.sessions.read().iter().any(|e| e.lock().info.id == Some(*id))
self.sessions.read().iter().any(|(_, e)| e.lock().info.id == Some(*id))
}

// returns (handshakes, egress, ingress)
fn session_count(&self) -> (usize, usize, usize) {
let mut handshakes = 0;
let mut egress = 0;
let mut ingress = 0;
for s in self.sessions.read().iter() {
for (_, s) in self.sessions.read().iter() {
match s.try_lock() {
Some(ref s) if s.is_ready() && s.info.originated => egress += 1,
Some(ref s) if s.is_ready() && !s.info.originated => ingress += 1,
Expand All @@ -552,12 +551,12 @@ impl Host {
}

fn connecting_to(&self, id: &NodeId) -> bool {
self.sessions.read().iter().any(|e| e.lock().id() == Some(id))
self.sessions.read().iter().any(|(_, e)| e.lock().id() == Some(id))
}

fn keep_alive(&self, io: &IoContext<NetworkIoMessage>) {
let mut to_kill = Vec::new();
for e in self.sessions.read().iter() {
for (_, e) in self.sessions.read().iter() {
let mut s = e.lock();
if !s.keep_alive(io) {
s.disconnect(io, DisconnectReason::PingTimeout);
Expand Down Expand Up @@ -674,21 +673,18 @@ impl Host {
let nonce = self.info.write().next_nonce();
let mut sessions = self.sessions.write();

let token = sessions.insert_with_opt(|token| {
trace!(target: "network", "{}: Initiating session {:?}", token, id);
match Session::new(io, socket, token, id, &nonce, &self.info.read()) {
Ok(s) => Some(Arc::new(Mutex::new(s))),
Err(e) => {
debug!(target: "network", "Session create error: {:?}", e);
None
}
}
});
let entry = sessions.vacant_entry();
let key = entry.key();

match token {
Some(t) => io.register_stream(t).map(|_| ()).map_err(Into::into),
None => {
debug!(target: "network", "Max sessions reached");
trace!(target: "network", "{}: Initiating session {:?}", key, id);

match Session::new(io, socket, key, id, &nonce, &self.info.read()) {
Ok(session) => {
entry.insert(Arc::new(Mutex::new(session)));
io.register_stream(key).map(|_| ()).map_err(Into::into)
},
Err(e) => {
debug!(target: "network", "Session create error: {:?}", e);
Ok(())
}
}
Expand Down Expand Up @@ -854,7 +850,7 @@ impl Host {

let handlers = self.handlers.read();
if !ready_data.is_empty() {
let duplicate = self.sessions.read().iter().any(|e| {
let duplicate = self.sessions.read().iter().any(|(_, e)| {
let session = e.lock();
session.token() != token && session.info.id == ready_id
});
Expand Down Expand Up @@ -961,7 +957,7 @@ impl Host {
if !s.expired() {
if s.is_ready() {
for (p, _) in self.handlers.read().iter() {
if s.have_capability(*p) {
if s.have_capability(*p) {
to_disconnect.push(*p);
}
}
Expand Down Expand Up @@ -992,7 +988,7 @@ impl Host {
let mut to_remove: Vec<PeerId> = Vec::new();
{
let sessions = self.sessions.read();
for c in sessions.iter() {
for (_, c) in sessions.iter() {
let s = c.lock();
if let Some(id) = s.id() {
if node_changes.removed.contains(id) {
Expand Down

0 comments on commit 12cbe93

Please sign in to comment.