98 changes: 90 additions & 8 deletions src/main/connection_listener/exchange_msg.rs
Expand Up @@ -16,15 +16,18 @@
// relating to use of the SAFE Network Software.


use common::{self, Core, CoreTimerId, Message, NameHash, Priority, Socket, State};
use super::check_reachability::CheckReachability;
use common::{self, Core, CoreTimerId, ExternalReachability, Message, NameHash, Priority, Socket,
State};
use main::{ActiveConnection, ConnectionCandidate, ConnectionId, ConnectionMap, Event, PeerId};
use mio::{EventLoop, EventSet, PollOpt, Timeout, Token};
use rust_sodium::crypto::box_::PublicKey;
use std::any::Any;
use std::cell::RefCell;
use std::collections::HashSet;
use std::collections::hash_map::Entry;
use std::mem;
use std::rc::Rc;
use std::rc::{Rc, Weak};

pub const EXCHANGE_MSG_TIMEOUT_MS: u64 = 10 * 60 * 1000;

Expand All @@ -37,6 +40,8 @@ pub struct ExchangeMsg {
our_pk: PublicKey,
socket: Socket,
timeout: Timeout,
reachability_children: HashSet<Token>,
self_weak: Weak<RefCell<ExchangeMsg>>,
}

impl ExchangeMsg {
Expand All @@ -57,7 +62,7 @@ impl ExchangeMsg {
let timeout = el.timeout_ms(CoreTimerId::new(token, 0),
timeout_ms.unwrap_or(EXCHANGE_MSG_TIMEOUT_MS))?;

let state = ExchangeMsg {
let state = Rc::new(RefCell::new(ExchangeMsg {
token: token,
cm: cm,
event_tx: event_tx,
Expand All @@ -66,17 +71,21 @@ impl ExchangeMsg {
our_pk: our_pk,
socket: socket,
timeout: timeout,
};
reachability_children: HashSet::with_capacity(4),
self_weak: Default::default(),
}));

state.borrow_mut().self_weak = Rc::downgrade(&state);

let _ = core.insert_state(token, Rc::new(RefCell::new(state)));
let _ = core.insert_state(token, state);

Ok(())
}

fn read(&mut self, core: &mut Core, el: &mut EventLoop<Core>) {
match self.socket.read::<Message>() {
Ok(Some(Message::BootstrapRequest(their_public_key, name_hash))) => {
self.handle_bootstrap_req(core, el, their_public_key, name_hash)
Ok(Some(Message::BootstrapRequest(their_public_key, name_hash, ext_reachability))) => {
self.handle_bootstrap_req(core, el, their_public_key, name_hash, ext_reachability)
}
Ok(Some(Message::Connect(their_public_key, name_hash))) => {
self.handle_connect(core, el, their_public_key, name_hash)
Expand All @@ -98,7 +107,74 @@ impl ExchangeMsg {
core: &mut Core,
el: &mut EventLoop<Core>,
their_public_key: PublicKey,
name_hash: NameHash) {
name_hash: NameHash,
ext_reachability: ExternalReachability) {
match ext_reachability {
ExternalReachability::Required { direct_listeners } => {
let state_data = StateData {
their_public_key: their_public_key,
name_hash: name_hash,
};
for their_listener in direct_listeners {
let self_weak = self.self_weak.clone();
let finish = move |core: &mut Core, el: &mut EventLoop<Core>, child, res| {
if let Some(self_rc) = self_weak.upgrade() {
self_rc.borrow_mut().handle_check_reachability(core, el, child, res)
}
};

if let Ok(child) = CheckReachability::<StateData>::start(core,
el,
their_listener,
state_data,
Box::new(finish)) {
let _ = self.reachability_children.insert(child);
}
}
if self.reachability_children.is_empty() {
debug!("Bootstrapper failed to pass requisite condition of external \
recheability. Denying bootstrap.");
return self.terminate(core, el);
}
}
ExternalReachability::NotRequired => {
self.send_bootstrap_resp(core, el, their_public_key, name_hash)
}
}
}

fn handle_check_reachability(&mut self,
core: &mut Core,
el: &mut EventLoop<Core>,
child: Token,
res: Result<StateData, ()>) {
let _ = self.reachability_children.remove(&child);
if let Ok(state_data) = res {
for child in self.reachability_children.drain() {
let child = match core.get_state(child) {
Some(state) => state,
None => continue,
};

child.borrow_mut().terminate(core, el);
}
return self.send_bootstrap_resp(core,
el,
state_data.their_public_key,
state_data.name_hash);
}
if self.reachability_children.is_empty() {
debug!("Bootstrapper failed to pass requisite condition of external recheability. \
Denying bootstrap.");
self.terminate(core, el);
}
}

fn send_bootstrap_resp(&mut self,
core: &mut Core,
el: &mut EventLoop<Core>,
their_public_key: PublicKey,
name_hash: NameHash) {
let their_id = match self.get_peer_id(their_public_key, name_hash) {
Ok(their_id) => their_id,
Err(()) => return self.terminate(core, el),
Expand Down Expand Up @@ -288,3 +364,9 @@ enum NextState {
ActiveConnection(PeerId),
ConnectionCandidate(PeerId),
}

#[derive(Debug, Clone, Copy)]
struct StateData {
their_public_key: PublicKey,
name_hash: NameHash,
}
24 changes: 15 additions & 9 deletions src/main/connection_listener/mod.rs
Expand Up @@ -15,6 +15,7 @@
// Please review the Licences for the specific language governing permissions and limitations
// relating to use of the SAFE Network Software.

mod check_reachability;
mod exchange_msg;

use self::exchange_msg::ExchangeMsg;
Expand Down Expand Up @@ -164,12 +165,11 @@ impl State for ConnectionListener {
}

#[cfg(test)]
mod test {

mod tests {
use super::*;
use super::exchange_msg::EXCHANGE_MSG_TIMEOUT_MS;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use common::{self, Core, CoreMessage, Message, NameHash};
use common::{self, Core, CoreMessage, ExternalReachability, Message, NameHash};
use maidsafe_utilities;
use maidsafe_utilities::event_sender::MaidSafeEventCategory;
use maidsafe_utilities::serialisation::{deserialise, serialise};
Expand All @@ -180,7 +180,6 @@ mod test {
use rust_sodium::crypto::box_::{self, PublicKey};
use rust_sodium::crypto::hash::sha256;
use rustc_serialize::Decodable;

use std::collections::HashMap;
use std::io::{Cursor, Read, Write};
use std::mem;
Expand Down Expand Up @@ -296,10 +295,14 @@ mod test {
Ok(unwrap!(deserialise(&payload), "Could not deserialise."))
}

fn bootstrap(name_hash: NameHash, pk: PublicKey, listener: Listener) {
fn bootstrap(name_hash: NameHash,
ext_reachability: ExternalReachability,
pk: PublicKey,
listener: Listener) {
let mut us = connect_to_listener(&listener);

let message = unwrap!(serialise(&Message::BootstrapRequest(pk, name_hash)));
let message =
unwrap!(serialise(&Message::BootstrapRequest(pk, name_hash, ext_reachability)));
unwrap!(write(&mut us, message), "Could not write.");

match unwrap!(read(&mut us), "Could not read.") {
Expand Down Expand Up @@ -344,7 +347,7 @@ mod test {
fn bootstrap_with_correct_parameters() {
let listener = start_listener();
let (pk, _) = box_::gen_keypair();
bootstrap(NAME_HASH, pk, listener);
bootstrap(NAME_HASH, ExternalReachability::NotRequired, pk, listener);
}

#[test]
Expand All @@ -359,7 +362,7 @@ mod test {
fn bootstrap_with_invalid_version_hash() {
let listener = start_listener();
let (pk, _) = box_::gen_keypair();
bootstrap(NAME_HASH_2, pk, listener);
bootstrap(NAME_HASH_2, ExternalReachability::NotRequired, pk, listener);
}

#[test]
Expand All @@ -374,7 +377,10 @@ mod test {
#[should_panic]
fn bootstrap_with_invalid_pub_key() {
let listener = start_listener();
bootstrap(NAME_HASH, listener.pk, listener);
bootstrap(NAME_HASH,
ExternalReachability::NotRequired,
listener.pk,
listener);
}

#[test]
Expand Down
105 changes: 68 additions & 37 deletions src/main/service.rs
Expand Up @@ -15,7 +15,7 @@
// Please review the Licences for the specific language governing permissions and limitations
// relating to use of the SAFE Network Software.

use common::{self, Core, CoreMessage, NameHash, Priority};
use common::{self, Core, CoreMessage, CrustUser, ExternalReachability, NameHash, Priority};
use maidsafe_utilities;
use maidsafe_utilities::thread::Joiner;
use main::{ActiveConnection, Bootstrap, Connect, ConnectionId, ConnectionInfoResult,
Expand All @@ -39,6 +39,9 @@ const SERVICE_DISCOVERY_TOKEN: Token = Token(1);
const LISTENER_TOKEN: Token = Token(2);

const SERVICE_DISCOVERY_DEFAULT_PORT: u16 = 5484;

const DISABLE_NAT: bool = true;

/// A structure representing a connection manager. This is the main object through which crust is
/// used.
pub struct Service {
Expand Down Expand Up @@ -237,17 +240,32 @@ impl Service {

/// Start the bootstrapping procedure. It will auto terminate after indicating success or
/// failure via the event channel.
pub fn start_bootstrap(&mut self, blacklist: HashSet<SocketAddr>) -> ::Res<()> {
pub fn start_bootstrap(&mut self,
blacklist: HashSet<SocketAddr>,
crust_user: CrustUser)
-> ::Res<()> {
let config = self.config.clone();
let our_pk = self.our_keys.0;
let name_hash = self.name_hash;
let cm = self.cm.clone();
let event_tx = self.event_tx.clone();
let ext_reachability = match crust_user {
CrustUser::Node => {
ExternalReachability::Required {
direct_listeners: unwrap!(self.our_listeners.lock())
.iter()
.map(|s| common::SocketAddr(*s))
.collect(),
}
}
CrustUser::Client => ExternalReachability::NotRequired,
};

self.post(move |core, el| if core.get_state(BOOTSTRAP_TOKEN).is_none() {
if let Err(e) = Bootstrap::start(core,
el,
name_hash,
ext_reachability,
our_pk,
cm,
&config,
Expand Down Expand Up @@ -354,44 +372,57 @@ impl Service {
/// peer, see `Service::connect` for more info.
// TODO: immediate return in case of sender.send() returned with NotificationError
pub fn prepare_connection_info(&self, result_token: u32) {
let event_tx = self.event_tx.clone();
let our_pub_key = self.our_keys.0;
let our_listeners =
unwrap!(self.our_listeners.lock()).iter().map(|e| common::SocketAddr(*e)).collect();
let mc = self.mc.clone();
if let Err(e) = self.post(move |mut core, mut el| {
let event_tx_clone = event_tx.clone();
match MappedTcpSocket::start(core, el, 0, &mc, move |_, _, socket, addrs| {
let hole_punch_addrs = addrs.into_iter()
.filter(|elt| nat::ip_addr_is_global(&elt.ip()))
.map(common::SocketAddr)
.collect();
let event_tx = event_tx_clone;
let event = Event::ConnectionInfoPrepared(ConnectionInfoResult {
result_token: result_token,
result: Ok(PrivConnectionInfo {
id: PeerId(our_pub_key),
for_direct: our_listeners,
for_hole_punch: hole_punch_addrs,
hole_punch_socket: socket,
}),
});
let _ = event_tx.send(event);
}) {
Ok(()) => (),
Err(e) => {
debug!("Error mapping tcp socket: {}", e);
let _ = event_tx.send(Event::ConnectionInfoPrepared(ConnectionInfoResult {
result_token: result_token,
result: Err(From::from(e)),
}));
}
};
}) {
let _ = self.event_tx.send(Event::ConnectionInfoPrepared(ConnectionInfoResult {
if DISABLE_NAT {
let event = Event::ConnectionInfoPrepared(ConnectionInfoResult {
result_token: result_token,
result: Err(From::from(e)),
}));
result: Ok(PrivConnectionInfo {
id: PeerId(self.our_keys.0),
for_direct: our_listeners,
for_hole_punch: Default::default(),
hole_punch_socket: None,
}),
});
let _ = self.event_tx.send(event);
} else {
let event_tx = self.event_tx.clone();
let our_pub_key = self.our_keys.0;
let mc = self.mc.clone();
if let Err(e) = self.post(move |mut core, mut el| {
let event_tx_clone = event_tx.clone();
match MappedTcpSocket::start(core, el, 0, &mc, move |_, _, socket, addrs| {
let hole_punch_addrs = addrs.into_iter()
.filter(|elt| nat::ip_addr_is_global(&elt.ip()))
.map(common::SocketAddr)
.collect();
let event_tx = event_tx_clone;
let event = Event::ConnectionInfoPrepared(ConnectionInfoResult {
result_token: result_token,
result: Ok(PrivConnectionInfo {
id: PeerId(our_pub_key),
for_direct: our_listeners,
for_hole_punch: hole_punch_addrs,
hole_punch_socket: Some(socket),
}),
});
let _ = event_tx.send(event);
}) {
Ok(()) => (),
Err(e) => {
debug!("Error mapping tcp socket: {}", e);
let _ = event_tx.send(Event::ConnectionInfoPrepared(ConnectionInfoResult {
result_token: result_token,
result: Err(From::from(e)),
}));
}
};
}) {
let _ = self.event_tx.send(Event::ConnectionInfoPrepared(ConnectionInfoResult {
result_token: result_token,
result: Err(From::from(e)),
}));
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/types.rs
Expand Up @@ -85,7 +85,7 @@ pub struct PrivConnectionInfo {
#[doc(hidden)]
pub for_hole_punch: Vec<common::SocketAddr>,
#[doc(hidden)]
pub hole_punch_socket: TcpBuilder,
pub hole_punch_socket: Option<TcpBuilder>,
}

impl PrivConnectionInfo {
Expand Down
3 changes: 0 additions & 3 deletions src/nat/mapped_tcp_socket/mod.rs
Expand Up @@ -113,9 +113,6 @@ impl<F> MappedTcpSocket<F>

// Ask Stuns
for stun in mc.peer_stuns() {
if true {
break;
}
let self_weak = Rc::downgrade(&state);
let handler = move |core: &mut Core, el: &mut EventLoop<Core>, child_token, res| {
if let Some(self_rc) = self_weak.upgrade() {
Expand Down
41 changes: 30 additions & 11 deletions src/tests/mod.rs
Expand Up @@ -19,7 +19,7 @@
pub mod utils;

pub use self::utils::{gen_config, get_event_sender, timebomb};
use common::SocketAddr;
use common::{CrustUser, SocketAddr};
use main::{Config, Event, Service};
use mio;

Expand Down Expand Up @@ -62,7 +62,10 @@ fn bootstrap_two_services_and_exchange_messages() {
let (event_tx1, event_rx1) = get_event_sender();
let mut service1 = unwrap!(Service::with_config(event_tx1, config1));

unwrap!(service1.start_bootstrap(HashSet::new()));
unwrap!(service1.start_listening_tcp());
let _ = expect_event!(event_rx1, Event::ListenerStarted(port) => port);

unwrap!(service1.start_bootstrap(HashSet::new(), CrustUser::Node));

let peer_id0 = expect_event!(event_rx1, Event::BootstrapConnect(peer_id, _) => peer_id);
assert_eq!(peer_id0, service0.id());
Expand Down Expand Up @@ -100,14 +103,17 @@ fn bootstrap_two_services_using_service_discovery() {
let (event_tx1, event_rx1) = get_event_sender();
let mut service1 = unwrap!(Service::with_config(event_tx1, config));

unwrap!(service1.start_listening_tcp());
let _ = expect_event!(event_rx1, Event::ListenerStarted(port) => port);

service0.start_service_discovery();
service0.set_service_discovery_listen(true);
unwrap!(service0.start_listening_tcp());

expect_event!(event_rx0, Event::ListenerStarted(_port));

service1.start_service_discovery();
unwrap!(service1.start_bootstrap(HashSet::new()));
unwrap!(service1.start_bootstrap(HashSet::new(), CrustUser::Client));

let peer_id0 = expect_event!(event_rx1, Event::BootstrapConnect(peer_id, _) => peer_id);
assert_eq!(peer_id0, service0.id());
Expand All @@ -134,7 +140,10 @@ fn bootstrap_with_multiple_contact_endpoints() {

let (event_tx1, event_rx1) = get_event_sender();
let mut service1 = unwrap!(Service::with_config(event_tx1, config1));
unwrap!(service1.start_bootstrap(HashSet::new()));
unwrap!(service1.start_bootstrap(HashSet::new(), CrustUser::Client));

unwrap!(service1.start_listening_tcp());
let _ = expect_event!(event_rx1, Event::ListenerStarted(port) => port);

let peer_id0 = expect_event!(event_rx1, Event::BootstrapConnect(peer_id, _) => peer_id);
assert_eq!(peer_id0, service0.id());
Expand Down Expand Up @@ -163,7 +172,10 @@ fn bootstrap_with_blacklist() {
let mut service1 = unwrap!(Service::with_config(event_tx1, config1));
let mut blacklist = HashSet::new();
blacklist.insert(*blacklisted_address);
unwrap!(service1.start_bootstrap(blacklist));
unwrap!(service1.start_bootstrap(blacklist, CrustUser::Client));

unwrap!(service1.start_listening_tcp());
let _ = expect_event!(event_rx1, Event::ListenerStarted(port) => port);

let peer_id0 = expect_event!(event_rx1, Event::BootstrapConnect(peer_id, _) => peer_id);
assert_eq!(peer_id0, service0.id());
Expand Down Expand Up @@ -193,7 +205,7 @@ fn bootstrap_fails_only_blacklisted_contact() {

let mut blacklist = HashSet::new();
blacklist.insert(*blacklisted_address);
unwrap!(service.start_bootstrap(blacklist));
unwrap!(service.start_bootstrap(blacklist, CrustUser::Client));

expect_event!(event_rx, Event::BootstrapFailed);

Expand All @@ -211,7 +223,7 @@ fn bootstrap_fails_if_there_are_no_contacts() {
let (event_tx, event_rx) = get_event_sender();
let mut service = unwrap!(Service::with_config(event_tx, config));

unwrap!(service.start_bootstrap(HashSet::new()));
unwrap!(service.start_bootstrap(HashSet::new(), CrustUser::Client));
expect_event!(event_rx, Event::BootstrapFailed);
}

Expand All @@ -228,7 +240,7 @@ fn bootstrap_timeouts_if_there_are_only_invalid_contacts() {
let (event_tx, event_rx) = get_event_sender();
let mut service = unwrap!(Service::with_config(event_tx, config));

unwrap!(service.start_bootstrap(HashSet::new()));
unwrap!(service.start_bootstrap(HashSet::new(), CrustUser::Client));
expect_event!(event_rx, Event::BootstrapFailed);
}

Expand All @@ -246,7 +258,11 @@ fn drop_disconnects() {

let (event_tx_1, event_rx_1) = get_event_sender();
let mut service_1 = unwrap!(Service::with_config(event_tx_1, config_1));
unwrap!(service_1.start_bootstrap(HashSet::new()));

unwrap!(service_1.start_listening_tcp());
let _ = expect_event!(event_rx_1, Event::ListenerStarted(port) => port);

unwrap!(service_1.start_bootstrap(HashSet::new(), CrustUser::Node));

let peer_id_0 = expect_event!(event_rx_1, Event::BootstrapConnect(peer_id, _) => peer_id);
expect_event!(event_rx_0, Event::BootstrapAccept(_peer_id));
Expand Down Expand Up @@ -386,7 +402,7 @@ fn drop_peer_when_no_message_received_within_inactivity_period() {
let (event_tx, event_rx) = get_event_sender();
let mut service = unwrap!(Service::with_config(event_tx, config));

unwrap!(service.start_bootstrap(HashSet::new()));
unwrap!(service.start_bootstrap(HashSet::new(), CrustUser::Node));
let peer_id = expect_event!(event_rx, Event::BootstrapConnect(peer_id, _) => peer_id);

// The peer should drop after inactivity.
Expand Down Expand Up @@ -416,7 +432,10 @@ fn do_not_drop_peer_even_when_no_data_messages_are_exchanged_within_inactivity_p
let (event_tx1, event_rx1) = get_event_sender();
let mut service1 = unwrap!(Service::with_config(event_tx1, config1));

unwrap!(service1.start_bootstrap(HashSet::new()));
unwrap!(service1.start_listening_tcp());
let _ = expect_event!(event_rx1, Event::ListenerStarted(port) => port);

unwrap!(service1.start_bootstrap(HashSet::new(), CrustUser::Node));
expect_event!(event_rx1, Event::BootstrapConnect(_peer_id, _));
expect_event!(event_rx0, Event::BootstrapAccept(_peer_id));

Expand Down