Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Deduplicating crate dependencies (part 2 of n, slab) #11613

Merged
merged 1 commit into from
Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion util/io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mio = { version = "0.6.19", optional = true }
crossbeam-deque = "0.7.3"
parking_lot = "0.10.0"
log = "0.4"
slab = "0.4"
slab = "0.4.2"
num_cpus = "1.8"
timer = "0.2"
time = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion util/network-devp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ rlp = "0.4.0"
secp256k1 = "0.17"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
slab = "0.2"
slab = "0.4.2"
tiny-keccak = "1.4"

[dev-dependencies]
Expand Down
52 changes: 24 additions & 28 deletions util/network-devp2p/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::time::Duration;
use slab::Slab;

use ethereum_types::H256;
use keccak_hash::keccak;
Expand Down Expand Up @@ -55,8 +56,6 @@ use crate::{
session::{Session, SessionData}
};

type Slab<T> = ::slab::Slab<T, usize>;

const MAX_SESSIONS: usize = 2048 + MAX_HANDSHAKES;
const MAX_HANDSHAKES: usize = 1024;

Expand Down Expand Up @@ -153,7 +152,7 @@ impl<'s> NetworkContextTrait for NetworkContext<'s> {
let session = self.resolve_session(peer);
if let Some(session) = session {
session.lock().send_packet(self.io, Some(protocol), packet_id as u8, &data)?;
} else {
} else {
trace!(target: "network", "Send: Peer no longer exist")
}
Ok(())
Expand Down Expand Up @@ -337,7 +336,7 @@ impl Host {
discovery: Mutex::new(None),
udp_socket: Mutex::new(None),
tcp_listener: Mutex::new(tcp_listener),
sessions: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_SESSION, MAX_SESSIONS))),
sessions: Arc::new(RwLock::new(Slab::with_capacity(MAX_SESSIONS))),
nodes: RwLock::new(NodeTable::new(path)),
handlers: RwLock::new(HashMap::new()),
timers: RwLock::new(HashMap::new()),
Expand Down Expand Up @@ -397,7 +396,7 @@ impl Host {
// disconnect all non-reserved peers here.
let reserved: HashSet<NodeId> = self.reserved_nodes.read().clone();
let mut to_kill = Vec::new();
for e in self.sessions.read().iter() {
for (_, e) in self.sessions.read().iter() {
let mut s = e.lock();
{
let id = s.id();
Expand Down Expand Up @@ -437,7 +436,7 @@ impl Host {
pub fn stop(&self, io: &IoContext<NetworkIoMessage>) {
self.stopping.store(true, AtomicOrdering::Release);
let mut to_kill = Vec::new();
for e in self.sessions.read().iter() {
for (_, e) in self.sessions.read().iter() {
let mut s = e.lock();
s.disconnect(io, DisconnectReason::ClientQuit);
to_kill.push(s.token());
Expand All @@ -454,7 +453,7 @@ impl Host {
let sessions = self.sessions.read();
let sessions = &*sessions;

let mut peers = Vec::with_capacity(sessions.count());
let mut peers = Vec::with_capacity(sessions.len());
for i in (0..MAX_SESSIONS).map(|x| x + FIRST_SESSION) {
if sessions.get(i).is_some() {
peers.push(i);
Expand Down Expand Up @@ -531,15 +530,15 @@ impl Host {
}

fn have_session(&self, id: &NodeId) -> bool {
self.sessions.read().iter().any(|e| e.lock().info.id == Some(*id))
self.sessions.read().iter().any(|(_, e)| e.lock().info.id == Some(*id))
}

// returns (handshakes, egress, ingress)
fn session_count(&self) -> (usize, usize, usize) {
let mut handshakes = 0;
let mut egress = 0;
let mut ingress = 0;
for s in self.sessions.read().iter() {
for (_, s) in self.sessions.read().iter() {
match s.try_lock() {
Some(ref s) if s.is_ready() && s.info.originated => egress += 1,
Some(ref s) if s.is_ready() && !s.info.originated => ingress += 1,
Expand All @@ -550,12 +549,12 @@ impl Host {
}

fn connecting_to(&self, id: &NodeId) -> bool {
self.sessions.read().iter().any(|e| e.lock().id() == Some(id))
self.sessions.read().iter().any(|(_, e)| e.lock().id() == Some(id))
}

fn keep_alive(&self, io: &IoContext<NetworkIoMessage>) {
let mut to_kill = Vec::new();
for e in self.sessions.read().iter() {
for (_, e) in self.sessions.read().iter() {
let mut s = e.lock();
if !s.keep_alive(io) {
s.disconnect(io, DisconnectReason::PingTimeout);
Expand Down Expand Up @@ -672,21 +671,18 @@ impl Host {
let nonce = self.info.write().next_nonce();
let mut sessions = self.sessions.write();

let token = sessions.insert_with_opt(|token| {
trace!(target: "network", "{}: Initiating session {:?}", token, id);
match Session::new(io, socket, token, id, &nonce, &self.info.read()) {
Ok(s) => Some(Arc::new(Mutex::new(s))),
Err(e) => {
debug!(target: "network", "Session create error: {:?}", e);
None
}
}
});
let entry = sessions.vacant_entry();
let key = entry.key();
ordian marked this conversation as resolved.
Show resolved Hide resolved

match token {
Some(t) => io.register_stream(t).map(|_| ()).map_err(Into::into),
None => {
debug!(target: "network", "Max sessions reached");
trace!(target: "network", "{}: Initiating session {:?}", key, id);

match Session::new(io, socket, key, id, &nonce, &self.info.read()) {
Ok(session) => {
entry.insert(Arc::new(Mutex::new(session)));
io.register_stream(key).map(|_| ()).map_err(Into::into)
},
Err(e) => {
debug!(target: "network", "Session create error: {:?}", e);
Ok(())
}
}
Expand Down Expand Up @@ -852,7 +848,7 @@ impl Host {

let handlers = self.handlers.read();
if !ready_data.is_empty() {
let duplicate = self.sessions.read().iter().any(|e| {
let duplicate = self.sessions.read().iter().any(|(_, e)| {
let session = e.lock();
session.token() != token && session.info.id == ready_id
});
Expand Down Expand Up @@ -959,7 +955,7 @@ impl Host {
if !s.expired() {
if s.is_ready() {
for (p, _) in self.handlers.read().iter() {
if s.have_capability(*p) {
if s.have_capability(*p) {
to_disconnect.push(*p);
}
}
Expand Down Expand Up @@ -990,7 +986,7 @@ impl Host {
let mut to_remove: Vec<PeerId> = Vec::new();
{
let sessions = self.sessions.read();
for c in sessions.iter() {
for (_, c) in sessions.iter() {
let s = c.lock();
if let Some(id) = s.id() {
if node_changes.removed.contains(id) {
Expand Down