Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Auto detect available port #782

Merged
merged 2 commits into from
Mar 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion ethcore/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ mod tests {
fn it_can_be_started() {
let spec = get_test_spec();
let temp_path = RandomTempPath::new();
let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_with_port(40456), &temp_path.as_path());
let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_local(), &temp_path.as_path());
assert!(service.is_ok());
}
}
93 changes: 53 additions & 40 deletions util/src/network/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ impl NetworkConfiguration {
config.listen_address = Some(SocketAddr::from_str(&format!("0.0.0.0:{}", port)).unwrap());
config
}

/// Create new default configuration for localhost-only connection with random port (usefull for testing)
pub fn new_local() -> NetworkConfiguration {
let mut config = NetworkConfiguration::new();
config.listen_address = Some(SocketAddr::from_str("127.0.0.1:0").unwrap());
config.nat_enabled = false;
config
}
}

// Tokens
Expand Down Expand Up @@ -269,12 +277,12 @@ pub struct HostInfo {
pub protocol_version: u32,
/// Client identifier
pub client_version: String,
/// TCP connection port.
pub listen_port: u16,
/// Registered capabilities (handlers)
pub capabilities: Vec<CapabilityInfo>,
/// Local address + discovery port
pub local_endpoint: NodeEndpoint,
/// Public address + discovery port
public_endpoint: NodeEndpoint,
pub public_endpoint: Option<NodeEndpoint>,
}

impl HostInfo {
Expand Down Expand Up @@ -307,7 +315,7 @@ struct ProtocolTimer {
/// Root IO handler. Manages protocol handlers, IO timers and network connections.
pub struct Host<Message> where Message: Send + Sync + Clone {
pub info: RwLock<HostInfo>,
tcp_listener: Mutex<Option<TcpListener>>,
tcp_listener: Mutex<TcpListener>,
handshakes: Arc<RwLock<Slab<SharedHandshake>>>,
sessions: Arc<RwLock<Slab<SharedSession>>>,
discovery: Mutex<Option<Discovery>>,
Expand All @@ -321,13 +329,12 @@ pub struct Host<Message> where Message: Send + Sync + Clone {

impl<Message> Host<Message> where Message: Send + Sync + Clone {
/// Create a new instance
pub fn new(config: NetworkConfiguration) -> Host<Message> {
let listen_address = match config.listen_address {
pub fn new(config: NetworkConfiguration) -> Result<Host<Message>, UtilError> {
let mut listen_address = match config.listen_address {
None => SocketAddr::from_str("0.0.0.0:30304").unwrap(),
Some(addr) => addr,
};

let udp_port = config.udp_port.unwrap_or(listen_address.port());
let keys = if let Some(ref secret) = config.use_secret {
KeyPair::from_secret(secret.clone()).unwrap()
} else {
Expand All @@ -342,20 +349,25 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|s| KeyPair::from_secret(s).expect("Error creating node secret key"))
};
let path = config.config_path.clone();
// Setup the server socket
let tcp_listener = try!(TcpListener::bind(&listen_address));
listen_address = SocketAddr::new(listen_address.ip(), try!(tcp_listener.local_addr()).port());
let udp_port = config.udp_port.unwrap_or(listen_address.port());
let local_endpoint = NodeEndpoint { address: listen_address, udp_port: udp_port };

let mut host = Host::<Message> {
info: RwLock::new(HostInfo {
keys: keys,
config: config,
nonce: H256::random(),
protocol_version: PROTOCOL_VERSION,
client_version: version(),
listen_port: 0,
capabilities: Vec::new(),
public_endpoint: local_endpoint, // will be replaced by public once it is resolved
public_endpoint: None,
local_endpoint: local_endpoint,
}),
discovery: Mutex::new(None),
tcp_listener: Mutex::new(None),
tcp_listener: Mutex::new(tcp_listener),
handshakes: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_HANDSHAKE, MAX_HANDSHAKES))),
sessions: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_SESSION, MAX_SESSIONS))),
nodes: RwLock::new(NodeTable::new(path)),
Expand All @@ -365,14 +377,12 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
stats: Arc::new(NetworkStats::default()),
pinned_nodes: Vec::new(),
};
let port = listen_address.port();
host.info.write().unwrap().deref_mut().listen_port = port;

let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone();
for n in boot_nodes {
host.add_node(&n);
}
host
Ok(host)
}

pub fn stats(&self) -> Arc<NetworkStats> {
Expand All @@ -397,50 +407,50 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
self.info.read().unwrap().client_version.clone()
}

pub fn client_url(&self) -> String {
format!("{}", Node::new(self.info.read().unwrap().id().clone(), self.info.read().unwrap().public_endpoint.clone()))
pub fn external_url(&self) -> Option<String> {
self.info.read().unwrap().public_endpoint.as_ref().map(|e| format!("{}", Node::new(self.info.read().unwrap().id().clone(), e.clone())))
}

pub fn local_url(&self) -> String {
let r = format!("{}", Node::new(self.info.read().unwrap().id().clone(), self.info.read().unwrap().local_endpoint.clone()));
println!("{}", r);
r
}

fn init_public_interface(&self, io: &IoContext<NetworkIoMessage<Message>>) {
fn init_public_interface(&self, io: &IoContext<NetworkIoMessage<Message>>) -> Result<(), UtilError> {
io.clear_timer(INIT_PUBLIC).unwrap();
let mut tcp_listener = self.tcp_listener.lock().unwrap();
if tcp_listener.is_some() {
return;
if self.info.read().unwrap().public_endpoint.is_some() {
return Ok(());
}
// public_endpoint in host info contains local adderss at this point
let listen_address = self.info.read().unwrap().public_endpoint.address.clone();
let udp_port = self.info.read().unwrap().config.udp_port.unwrap_or(listen_address.port());
let local_endpoint = self.info.read().unwrap().local_endpoint.clone();
let public_address = self.info.read().unwrap().config.public_address.clone();
let public_endpoint = match public_address {
None => {
let public_address = select_public_address(listen_address.port());
let local_endpoint = NodeEndpoint { address: public_address, udp_port: udp_port };
let public_address = select_public_address(local_endpoint.address.port());
let public_endpoint = NodeEndpoint { address: public_address, udp_port: local_endpoint.udp_port };
if self.info.read().unwrap().config.nat_enabled {
match map_external_address(&local_endpoint) {
Some(endpoint) => {
info!("NAT mappped to external address {}", endpoint.address);
info!("NAT mapped to external address {}", endpoint.address);
endpoint
},
None => local_endpoint
None => public_endpoint
}
} else {
local_endpoint
public_endpoint
}
}
Some(addr) => NodeEndpoint { address: addr, udp_port: udp_port }
Some(addr) => NodeEndpoint { address: addr, udp_port: local_endpoint.udp_port }
};

// Setup the server socket
*tcp_listener = Some(TcpListener::bind(&listen_address).unwrap());
self.info.write().unwrap().public_endpoint = public_endpoint.clone();
io.register_stream(TCP_ACCEPT).expect("Error registering TCP listener");
info!("Public node URL: {}", self.client_url());
self.info.write().unwrap().public_endpoint = Some(public_endpoint.clone());
info!("Public node URL: {}", self.external_url().unwrap());

// Initialize discovery.
let discovery = {
let info = self.info.read().unwrap();
if info.config.discovery_enabled && !info.config.pin {
Some(Discovery::new(&info.keys, listen_address.clone(), public_endpoint, DISCOVERY))
Some(Discovery::new(&info.keys, public_endpoint.address.clone(), public_endpoint, DISCOVERY))
} else { None }
};

Expand All @@ -454,6 +464,8 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer");
*self.discovery.lock().unwrap().deref_mut() = Some(discovery);
}
try!(io.register_stream(TCP_ACCEPT));
Ok(())
}

fn maintain_network(&self, io: &IoContext<NetworkIoMessage<Message>>) {
Expand Down Expand Up @@ -567,7 +579,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
fn accept(&self, io: &IoContext<NetworkIoMessage<Message>>) {
trace!(target: "network", "Accepting incoming connection");
loop {
let socket = match self.tcp_listener.lock().unwrap().as_ref().unwrap().accept() {
let socket = match self.tcp_listener.lock().unwrap().accept() {
Ok(None) => break,
Ok(Some((sock, _addr))) => sock,
Err(e) => {
Expand Down Expand Up @@ -861,7 +873,8 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
fn timeout(&self, io: &IoContext<NetworkIoMessage<Message>>, token: TimerToken) {
match token {
IDLE => self.maintain_network(io),
INIT_PUBLIC => self.init_public_interface(io),
INIT_PUBLIC => self.init_public_interface(io).unwrap_or_else(|e|
warn!("Error initializing public interface: {:?}", e)),
FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io),
FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.connection_timeout(token, io),
DISCOVERY_REFRESH => {
Expand Down Expand Up @@ -945,7 +958,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
}
}
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().as_ref().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
_ => warn!("Unexpected stream registration")
}
}
Expand Down Expand Up @@ -986,7 +999,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
}
}
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().as_ref().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
_ => warn!("Unexpected stream update")
}
}
Expand Down Expand Up @@ -1054,6 +1067,6 @@ fn host_client_url() {
let mut config = NetworkConfiguration::new();
let key = h256_from_hex("6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2");
config.use_secret = Some(key);
let host: Host<u32> = Host::new(config);
assert!(host.client_url().starts_with("enode://101b3ef5a4ea7a1c7928e24c4c75fd053c235d7b80c22ae5c03d145d0ac7396e2a4ffff9adee3133a7b05044a5cee08115fd65145e5165d646bde371010d803c@"));
let host: Host<u32> = Host::new(config).unwrap();
assert!(host.local_url().starts_with("enode://101b3ef5a4ea7a1c7928e24c4c75fd053c235d7b80c22ae5c03d145d0ac7396e2a4ffff9adee3133a7b05044a5cee08115fd65145e5165d646bde371010d803c@"));
}
1 change: 1 addition & 0 deletions util/src/network/ip_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ fn can_select_public_address() {
assert!(pub_address.port() == 40477);
}

#[ignore]
#[test]
fn can_map_external_address_or_fail() {
let pub_address = select_public_address(40478);
Expand Down
2 changes: 1 addition & 1 deletion util/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
//! }
//!
//! fn main () {
//! let mut service = NetworkService::<MyMessage>::start(NetworkConfiguration::new_with_port(40412)).expect("Error creating network service");
//! let mut service = NetworkService::<MyMessage>::start(NetworkConfiguration::new_local()).expect("Error creating network service");
//! service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]);
//!
//! // Wait for quit condition
Expand Down
21 changes: 16 additions & 5 deletions util/src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use io::*;
pub struct NetworkService<Message> where Message: Send + Sync + Clone + 'static {
io_service: IoService<NetworkIoMessage<Message>>,
host_info: String,
host: Arc<Host<Message>>,
stats: Arc<NetworkStats>,
panic_handler: Arc<PanicHandler>
}
Expand All @@ -39,15 +40,16 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
let mut io_service = try!(IoService::<NetworkIoMessage<Message>>::start());
panic_handler.forward_from(&io_service);

let host = Arc::new(Host::new(config));
let host = Arc::new(try!(Host::new(config)));
let stats = host.stats().clone();
let host_info = host.client_version();
try!(io_service.register_handler(host));
try!(io_service.register_handler(host.clone()));
Ok(NetworkService {
io_service: io_service,
host_info: host_info,
stats: stats,
panic_handler: panic_handler
panic_handler: panic_handler,
host: host,
})
}

Expand All @@ -71,12 +73,21 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
&mut self.io_service
}

/// Returns underlying io service.
/// Returns network statistics.
pub fn stats(&self) -> &NetworkStats {
&self.stats
}
}

/// Returns external url if available.
pub fn external_url(&self) -> Option<String> {
self.host.external_url()
}

/// Returns external url if available.
pub fn local_url(&self) -> String {
self.host.local_url()
}
}

impl<Message> MayPanic for NetworkService<Message> where Message: Send + Sync + Clone + 'static {
fn on_panic<F>(&self, closure: F) where F: OnPanicListener {
Expand Down
2 changes: 1 addition & 1 deletion util/src/network/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ impl Session {
.append(&host.protocol_version)
.append(&host.client_version)
.append(&host.capabilities)
.append(&host.listen_port)
.append(&host.local_endpoint.address.port())
.append(host.id());
self.connection.send_packet(&rlp.out())
}
Expand Down
22 changes: 10 additions & 12 deletions util/src/network/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,21 @@ impl NetworkProtocolHandler<TestProtocolMessage> for TestProtocol {

#[test]
fn net_service() {
let mut service = NetworkService::<TestProtocolMessage>::start(NetworkConfiguration::new_with_port(40414)).expect("Error creating network service");
let mut service = NetworkService::<TestProtocolMessage>::start(NetworkConfiguration::new_local()).expect("Error creating network service");
service.register_protocol(Arc::new(TestProtocol::new(false)), "myproto", &[1u8]).unwrap();
}

#[test]
fn net_connect() {
::log::init_log();
let key1 = KeyPair::create().unwrap();
let mut config1 = NetworkConfiguration::new_with_port(30354);
let mut config1 = NetworkConfiguration::new_local();
config1.use_secret = Some(key1.secret().clone());
config1.nat_enabled = false;
config1.boot_nodes = vec![ ];
let mut config2 = NetworkConfiguration::new_with_port(30355);
config2.boot_nodes = vec![ format!("enode://{}@127.0.0.1:30354", key1.public().hex()) ];
config2.nat_enabled = false;
let mut service1 = NetworkService::<TestProtocolMessage>::start(config1).unwrap();
let mut config2 = NetworkConfiguration::new_local();
info!("net_connect: local URL: {}", service1.local_url());
config2.boot_nodes = vec![ service1.local_url() ];
let mut service2 = NetworkService::<TestProtocolMessage>::start(config2).unwrap();
let handler1 = TestProtocol::register(&mut service1, false);
let handler2 = TestProtocol::register(&mut service2, false);
Expand All @@ -125,14 +125,12 @@ fn net_connect() {
#[test]
fn net_disconnect() {
let key1 = KeyPair::create().unwrap();
let mut config1 = NetworkConfiguration::new_with_port(30364);
let mut config1 = NetworkConfiguration::new_local();
config1.use_secret = Some(key1.secret().clone());
config1.nat_enabled = false;
config1.boot_nodes = vec![ ];
let mut config2 = NetworkConfiguration::new_with_port(30365);
config2.boot_nodes = vec![ format!("enode://{}@127.0.0.1:30364", key1.public().hex()) ];
config2.nat_enabled = false;
let mut service1 = NetworkService::<TestProtocolMessage>::start(config1).unwrap();
let mut config2 = NetworkConfiguration::new_local();
config2.boot_nodes = vec![ service1.local_url() ];
let mut service2 = NetworkService::<TestProtocolMessage>::start(config2).unwrap();
let handler1 = TestProtocol::register(&mut service1, false);
let handler2 = TestProtocol::register(&mut service2, true);
Expand All @@ -145,7 +143,7 @@ fn net_disconnect() {

#[test]
fn net_timeout() {
let config = NetworkConfiguration::new_with_port(30346);
let config = NetworkConfiguration::new_local();
let mut service = NetworkService::<TestProtocolMessage>::start(config).unwrap();
let handler = TestProtocol::register(&mut service, false);
while !handler.got_timeout() {
Expand Down