Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

UDP Discovery #440

Merged
merged 55 commits into from
Feb 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
09b6503
Discovery packets
arkpar Feb 12, 2016
48924f4
Merge branch 'master' of github.com:ethcore/parity into discovery
arkpar Feb 12, 2016
2af379d
Merge branch 'net' of github.com:ethcore/parity into discovery
arkpar Feb 12, 2016
62b9f4b
UDP discovery working
arkpar Feb 13, 2016
76ea030
Small refactoring
arkpar Feb 14, 2016
9768fdd
Homestead block set to 1100000
arkpar Feb 14, 2016
2d89708
Reduced thread contention
arkpar Feb 14, 2016
718646f
Refactored host to use different containers for handshakes and sessions
arkpar Feb 14, 2016
7503d66
Fixed panic on session creation
arkpar Feb 14, 2016
dee375b
Handle session creation error
arkpar Feb 14, 2016
fc7483a
Propagate only one last hash for peers that are too far behind
arkpar Feb 14, 2016
61c52f1
Fixed panic on accessing expired node
arkpar Feb 14, 2016
38f4a06
Fixed panic on accessing expired node
arkpar Feb 14, 2016
986448c
Merge branch 'net' into discovery
arkpar Feb 14, 2016
186c758
Node table persistency
arkpar Feb 15, 2016
ba95260
Merge branch 'master' of github.com:ethcore/parity into discovery
arkpar Feb 15, 2016
cf45d59
Node table tests
arkpar Feb 15, 2016
4d40991
Discovery test
arkpar Feb 15, 2016
3096892
Merge branch 'master' of github.com:ethcore/parity into discovery
arkpar Feb 15, 2016
0e1e804
Save key to a file
arkpar Feb 15, 2016
4b9c7f7
Add incoming connection to node table
arkpar Feb 15, 2016
01a83e6
Populate discovery from node table
arkpar Feb 15, 2016
a2c0508
Merge branch 'master' of github.com:ethcore/parity into discovery
arkpar Feb 15, 2016
0bef355
Removed temp test code
arkpar Feb 15, 2016
64913d5
Additional address filter
arkpar Feb 15, 2016
fb0b5b2
Raise fd limit in linux
arkpar Feb 15, 2016
dbf3691
Return nothing on state requests instead of panicing
arkpar Feb 16, 2016
681350b
Merge branch 'discovery' of github.com:ethcore/parity into discovery
arkpar Feb 16, 2016
2039473
Get public address/UPNP refactoring
arkpar Feb 16, 2016
f771306
Get public address/UPNP refactoring
arkpar Feb 16, 2016
58fdfe7
Handle pinning and enable_discovery options
arkpar Feb 16, 2016
c5ca293
Merge branch 'master' of github.com:ethcore/parity into discovery
arkpar Feb 16, 2016
fa316c2
Merge branch 'discovery' of github.com:ethcore/parity into discovery
arkpar Feb 16, 2016
33e8d74
Max handhsakes reached is now a debug warning
arkpar Feb 16, 2016
d95e971
Prevent deadlocks
arkpar Feb 16, 2016
a4ea073
Fixed some tests
arkpar Feb 16, 2016
f4fa747
ip_utils tests
arkpar Feb 16, 2016
217cbec
Disconnect test
arkpar Feb 16, 2016
4f73d63
Tweaked CLI options
arkpar Feb 16, 2016
fbe06d3
More tests
arkpar Feb 16, 2016
7e0dfb4
Minor test tweaks and code cleanup
arkpar Feb 16, 2016
b6ccbdb
Lower max handshakes to reduce network load
arkpar Feb 16, 2016
0b916e0
Merge branch 'master' into discovery
NikVolf Feb 17, 2016
39a98cd
Prevent connection deletion until unregister is called; minor tweaks
arkpar Feb 17, 2016
a179722
Merge branch 'discovery' of github.com:ethcore/parity into discovery
arkpar Feb 17, 2016
e4baf37
Fixed adding boot nodes to discovery table; Ping optimization
arkpar Feb 17, 2016
eef193e
Don't add useless peers to table
arkpar Feb 17, 2016
c9f3f5e
Tweaked connection limits to be a bit more aggressive
arkpar Feb 17, 2016
0cfc4cb
More tests
arkpar Feb 17, 2016
e0623f5
Merge branch 'master' of github.com:ethcore/parity into discovery
arkpar Feb 18, 2016
d9fec87
Merge branch 'master' of github.com:ethcore/parity into discovery
arkpar Feb 19, 2016
1d60d82
Merge branch 'master' of github.com:ethcore/parity into discovery
arkpar Feb 19, 2016
85c842b
Restored service test
arkpar Feb 19, 2016
beab90c
Added is_valid_node_url
arkpar Feb 19, 2016
ab233a9
Slightly improved tests
arkpar Feb 19, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions ethcore/src/block_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,19 @@ impl BlockQueue {
}

fn verify(verification: Arc<Mutex<Verification>>, engine: Arc<Box<Engine>>, wait: Arc<Condvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<Condvar>) {
while !deleting.load(AtomicOrdering::Relaxed) {
while !deleting.load(AtomicOrdering::Acquire) {
{
let mut lock = verification.lock().unwrap();

if lock.unverified.is_empty() && lock.verifying.is_empty() {
empty.notify_all();
}

while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Relaxed) {
while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Acquire) {
lock = wait.wait(lock).unwrap();
}

if deleting.load(AtomicOrdering::Relaxed) {
if deleting.load(AtomicOrdering::Acquire) {
return;
}
}
Expand Down Expand Up @@ -347,7 +347,7 @@ impl MayPanic for BlockQueue {
impl Drop for BlockQueue {
fn drop(&mut self) {
self.clear();
self.deleting.store(true, AtomicOrdering::Relaxed);
self.deleting.store(true, AtomicOrdering::Release);
self.more_to_verify.notify_all();
for t in self.verifiers.drain(..) {
t.join().unwrap();
Expand Down
4 changes: 2 additions & 2 deletions ethcore/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,11 +419,11 @@ impl BlockChainClient for Client {
}

fn state_data(&self, _hash: &H256) -> Option<Bytes> {
unimplemented!();
None
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prevents panic when someone requests NodeData from us

}

fn block_receipts(&self, _hash: &H256) -> Option<Bytes> {
unimplemented!();
None
}

fn import_block(&self, bytes: Bytes) -> ImportResult {
Expand Down
5 changes: 1 addition & 4 deletions ethcore/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ impl IoHandler<NetSyncMessage> for ClientIoHandler {
}
}

// TODO: rewrite into something that doesn't dependent on the testing environment having a particular port ready for use.
/*
#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -136,8 +134,7 @@ mod tests {
fn it_can_be_started() {
let spec = get_test_spec();
let temp_path = RandomTempPath::new();
let service = ClientService::start(spec, NetworkConfiguration::new(), &temp_path.as_path());
let service = ClientService::start(spec, NetworkConfiguration::new_with_port(40456), &temp_path.as_path());
assert!(service.is_ok());
}
}
*/
44 changes: 28 additions & 16 deletions parity/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ extern crate ethcore_rpc as rpc;

use std::net::{SocketAddr};
use std::env;
use std::path::PathBuf;
use rlog::{LogLevelFilter};
use env_logger::LogBuilder;
use ctrlc::CtrlC;
Expand Down Expand Up @@ -69,8 +70,10 @@ Options:

--no-bootstrap Don't bother trying to connect to any nodes initially.
--listen-address URL Specify the IP/port on which to listen for peers [default: 0.0.0.0:30304].
--public-address URL Specify the IP/port on which peers may connect [default: 0.0.0.0:30304].
--public-address URL Specify the IP/port on which peers may connect.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defaults are now determined by the networking layer and not by docopts.
"0.0.0.0" public address does not make sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does the user know which the default listen port is?

--address URL Equivalent to --listen-address URL --public-address URL.
--peers NUM Try to manintain that many peers [default: 25].
--no-discovery Disable new peer discovery.
--upnp Use UPnP to try to figure out the correct network settings.
--node-key KEY Specify node secret key as hex string.

Expand All @@ -95,8 +98,10 @@ struct Args {
flag_keys_path: String,
flag_no_bootstrap: bool,
flag_listen_address: String,
flag_public_address: String,
flag_public_address: Option<String>,
flag_address: Option<String>,
flag_peers: u32,
flag_no_discovery: bool,
flag_upnp: bool,
flag_node_key: Option<String>,
flag_cache_pref_size: usize,
Expand Down Expand Up @@ -165,7 +170,7 @@ impl Configuration {
self.args.flag_db_path.replace("$HOME", env::home_dir().unwrap().to_str().unwrap())
}

fn keys_path(&self) -> String {
fn _keys_path(&self) -> String {
self.args.flag_keys_path.replace("$HOME", env::home_dir().unwrap().to_str().unwrap())
}

Expand All @@ -187,21 +192,23 @@ impl Configuration {
}
}

fn net_addresses(&self) -> (SocketAddr, SocketAddr) {
let listen_address;
let public_address;
fn net_addresses(&self) -> (Option<SocketAddr>, Option<SocketAddr>) {
let mut listen_address = None;
let mut public_address = None;

match self.args.flag_address {
None => {
listen_address = SocketAddr::from_str(self.args.flag_listen_address.as_ref()).expect("Invalid listen address given with --listen-address");
public_address = SocketAddr::from_str(self.args.flag_public_address.as_ref()).expect("Invalid public address given with --public-address");
}
Some(ref a) => {
public_address = SocketAddr::from_str(a.as_ref()).expect("Invalid listen/public address given with --address");
listen_address = public_address;
if let Some(ref a) = self.args.flag_address {
public_address = Some(SocketAddr::from_str(a.as_ref()).expect("Invalid listen/public address given with --address"));
listen_address = public_address;
}
if listen_address.is_none() {
listen_address = Some(SocketAddr::from_str(self.args.flag_listen_address.as_ref()).expect("Invalid listen address given with --listen-address"));
}
if let Some(ref a) = self.args.flag_public_address {
if public_address.is_some() {
panic!("Conflicting flags: --address and --public-address");
}
};

public_address = Some(SocketAddr::from_str(a.as_ref()).expect("Invalid listen address given with --public-address"));
}
(listen_address, public_address)
}

Expand Down Expand Up @@ -236,6 +243,11 @@ impl Configuration {
net_settings.listen_address = listen;
net_settings.public_address = public;
net_settings.use_secret = self.args.flag_node_key.as_ref().map(|s| Secret::from_str(&s).expect("Invalid key string"));
net_settings.discovery_enabled = !self.args.flag_no_discovery;
net_settings.ideal_peers = self.args.flag_peers;
let mut net_path = PathBuf::from(&self.path());
net_path.push("network");
net_settings.config_path = Some(net_path.to_str().unwrap().to_owned());

// Build client
let mut service = ClientService::start(spec, net_settings, &Path::new(&self.path())).unwrap();
Expand Down
8 changes: 4 additions & 4 deletions sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ const RECEIPTS_PACKET: u8 = 0x10;

const NETWORK_ID: U256 = ONE_U256; //TODO: get this from parent

const CONNECTION_TIMEOUT_SEC: f64 = 30f64;
const CONNECTION_TIMEOUT_SEC: f64 = 10f64;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Large timeout and high import speed leads to block queue underrun and sync stalling when waiting for a unresponsive peer for too long.


struct Header {
/// Header data
Expand Down Expand Up @@ -314,7 +314,7 @@ impl ChainSync {
}

self.peers.insert(peer_id.clone(), peer);
info!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id));
debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id));
self.sync_peer(io, peer_id, false);
Ok(())
}
Expand Down Expand Up @@ -545,7 +545,7 @@ impl ChainSync {
pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) {
trace!(target: "sync", "== Disconnecting {}", peer);
if self.peers.contains_key(&peer) {
info!(target: "sync", "Disconnected {}", peer);
debug!(target: "sync", "Disconnected {}", peer);
self.clear_peer_download(peer);
self.peers.remove(&peer);
self.continue_sync(io);
Expand Down Expand Up @@ -1179,7 +1179,7 @@ impl ChainSync {
for (peer_id, peer_number) in updated_peers {
let mut peer_best = self.peers.get(&peer_id).unwrap().latest_hash.clone();
if best_number - peer_number > MAX_PEERS_PROPAGATION as BlockNumber {
// If we think peer is too far behind just end one latest hash
// If we think peer is too far behind just send one latest hash
peer_best = last_parent.clone();
}
sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &peer_best, &local_best) {
Expand Down
1 change: 1 addition & 0 deletions util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ clippy = { version = "0.0.42", optional = true }
json-tests = { path = "json-tests" }
target_info = "0.1.0"
igd = "0.4.2"
libc = "0.2.7"

[features]
default = []
Expand Down
25 changes: 24 additions & 1 deletion util/fdlimit/src/raise_fd_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,28 @@ pub unsafe fn raise_fd_limit() {
}
}

#[cfg(not(any(target_os = "macos", target_os = "ios")))]
#[cfg(any(target_os = "linux"))]
#[allow(non_camel_case_types)]
pub unsafe fn raise_fd_limit() {
use libc;
use std::io;

// Fetch the current resource limits
let mut rlim = libc::rlimit{rlim_cur: 0, rlim_max: 0};
if libc::getrlimit(libc::RLIMIT_NOFILE, &mut rlim) != 0 {
let err = io::Error::last_os_error();
panic!("raise_fd_limit: error calling getrlimit: {}", err);
}

// Set soft limit to hard imit
rlim.rlim_cur = rlim.rlim_max;

// Set our newly-increased resource limit
if libc::setrlimit(libc::RLIMIT_NOFILE, &rlim) != 0 {
let err = io::Error::last_os_error();
panic!("raise_fd_limit: error calling setrlimit: {}", err);
}
}

#[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "linux")))]
pub unsafe fn raise_fd_limit() {}
28 changes: 2 additions & 26 deletions util/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,32 +170,8 @@ pub trait BytesConvertable {
fn to_bytes(&self) -> Bytes { self.as_slice().to_vec() }
}

impl<'a> BytesConvertable for &'a [u8] {
fn bytes(&self) -> &[u8] { self }
}

impl BytesConvertable for Vec<u8> {
fn bytes(&self) -> &[u8] { self }
}

impl BytesConvertable for String {
fn bytes(&self) -> &[u8] { &self.as_bytes() }
}

macro_rules! impl_bytes_convertable_for_array {
($zero: expr) => ();
($len: expr, $($idx: expr),*) => {
impl BytesConvertable for [u8; $len] {
fn bytes(&self) -> &[u8] { self }
}
impl_bytes_convertable_for_array! { $($idx),* }
}
}

// -1 at the end is not expanded
impl_bytes_convertable_for_array! {
32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16,
15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, -1
impl<T> BytesConvertable for T where T: AsRef<[u8]> {
fn bytes(&self) -> &[u8] { self.as_ref() }
}

#[test]
Expand Down
13 changes: 7 additions & 6 deletions util/src/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,6 @@ macro_rules! impl_hash {
/// Unformatted binary data of fixed length.
pub struct $from (pub [u8; $size]);

impl BytesConvertable for $from {
fn bytes(&self) -> &[u8] {
&self.0
}
}

impl Deref for $from {
type Target = [u8];

Expand All @@ -92,6 +86,13 @@ macro_rules! impl_hash {
}
}

impl AsRef<[u8]> for $from {
#[inline]
fn as_ref(&self) -> &[u8] {
&self.0
}
}

impl DerefMut for $from {
#[inline]
fn deref_mut(&mut self) -> &mut [u8] {
Expand Down
5 changes: 5 additions & 0 deletions util/src/io/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,11 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
IoMessage::DeregisterStream { handler_id, token } => {
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
handler.deregister_stream(token, event_loop);
// unregister a timer associated with the token (if any)
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
if let Some(timer) = self.timers.write().unwrap().remove(&timer_id) {
event_loop.clear_timeout(timer.timeout);
}
},
IoMessage::UpdateStreamRegistration { handler_id, token } => {
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
Expand Down
15 changes: 11 additions & 4 deletions util/src/io/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct Worker {
thread: Option<JoinHandle<()>>,
wait: Arc<Condvar>,
deleting: Arc<AtomicBool>,
wait_mutex: Arc<Mutex<()>>,
}

impl Worker {
Expand All @@ -61,6 +62,7 @@ impl Worker {
thread: None,
wait: wait.clone(),
deleting: deleting.clone(),
wait_mutex: wait_mutex.clone(),
};
worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn(
move || {
Expand All @@ -77,13 +79,17 @@ impl Worker {
wait_mutex: Arc<Mutex<()>>,
deleting: Arc<AtomicBool>)
where Message: Send + Sync + Clone + 'static {
while !deleting.load(AtomicOrdering::Relaxed) {
loop {
{
let lock = wait_mutex.lock().unwrap();
let _ = wait.wait(lock).unwrap();
if deleting.load(AtomicOrdering::Relaxed) {
if deleting.load(AtomicOrdering::Acquire) {
return;
}
let _ = wait.wait(lock).unwrap();
}

if deleting.load(AtomicOrdering::Acquire) {
return;
}
while let chase_lev::Steal::Data(work) = stealer.steal() {
Worker::do_work(work, channel.clone());
Expand Down Expand Up @@ -114,7 +120,8 @@ impl Worker {

impl Drop for Worker {
fn drop(&mut self) {
self.deleting.store(true, AtomicOrdering::Relaxed);
let _ = self.wait_mutex.lock();
self.deleting.store(true, AtomicOrdering::Release);
self.wait.notify_all();
let thread = mem::replace(&mut self.thread, None).unwrap();
thread.join().ok();
Expand Down
1 change: 1 addition & 0 deletions util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ extern crate serde;
#[macro_use]
extern crate log as rlog;
extern crate igd;
extern crate libc;

pub mod standard;
#[macro_use]
Expand Down