Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolves #24 - Bootstrap and misc #34

Merged
merged 5 commits into from
Apr 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 31 additions & 0 deletions src/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2019 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under the MIT license <LICENSE-MIT
// http://opensource.org/licenses/MIT> or the Modified BSD license <LICENSE-BSD
// https://opensource.org/licenses/BSD-3-Clause>, at your option. This file may not be copied,
// modified, or distributed except according to those terms. Please review the Licences for the
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

use crate::connect;
use crate::context::{ctx, BootstrapGroupBuilder};

pub fn initiate() {
let (proxies, event_tx): (Vec<_>, _) = ctx(|c| {
(
c.bootstrap_cache
.peers()
.iter()
.rev()
.chain(c.bootstrap_cache.hard_coded_contacts().iter())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this will attempt to connect to hard coded contacts in parallel with the cached nodes. In case of success, I believe we'd like to postpone connection to hard coded contacts when connection to all cached nodes fails.

.cloned()
.collect(),
c.event_tx.clone(),
)
});

let bootstrap_group_builder = BootstrapGroupBuilder::new(event_tx);
for proxy in proxies {
let _ = connect::connect_to(proxy, None, Some(&bootstrap_group_builder));
}
}
13 changes: 13 additions & 0 deletions src/bootstrap_cache.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
// Copyright 2019 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under the MIT license <LICENSE-MIT
// http://opensource.org/licenses/MIT> or the Modified BSD license <LICENSE-BSD
// https://opensource.org/licenses/BSD-3-Clause>, at your option. This file may not be copied,
// modified, or distributed except according to those terms. Please review the Licences for the
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

use crate::dirs::Dirs;
use crate::{Error, NodeInfo, R};
use bincode::{deserialize_from, serialize_into};
Expand Down Expand Up @@ -69,6 +78,10 @@ impl BootstrapCache {
&self.peers
}

pub fn hard_coded_contacts(&self) -> &HashSet<NodeInfo> {
&self.hard_coded_contacts
}

/// Caches given peer if it's not in hard coded contacts.
pub fn add_peer(&mut self, peer: NodeInfo) {
if self.hard_coded_contacts.contains(&peer) {
Expand Down
19 changes: 8 additions & 11 deletions src/communicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::bootstrap_cache::BootstrapCache;
use crate::context::{ctx, ctx_mut, Connection, FromPeer, ToPeer};
use crate::error::Error;
use crate::event::Event;
use crate::utils;
use crate::utils::{self, QConn};
use crate::wire_msg::{Handshake, WireMsg};
use crate::{connect, NodeInfo};
use crate::{Peer, R};
Expand All @@ -34,7 +34,7 @@ pub fn try_write_to_peer(peer: Peer, msg: WireMsg) {
let conn = c
.connections
.entry(peer_addr)
.or_insert_with(|| Connection::new(peer_addr, event_tx));
.or_insert_with(|| Connection::new(peer_addr, event_tx, None));

match conn.to_peer {
ToPeer::NoConnection => Some(msg),
Expand All @@ -45,6 +45,7 @@ pub fn try_write_to_peer(peer: Peer, msg: WireMsg) {
ToPeer::Initiated {
ref peer_cert_der,
ref mut pending_sends,
..
} => {
if *peer_cert_der != node_info.peer_cert_der {
info!("TODO Certificate we have for the peer already doesn't match with the \
Expand All @@ -62,7 +63,7 @@ pub fn try_write_to_peer(peer: Peer, msg: WireMsg) {

if connect_and_send.is_some() {
let peer_addr = node_info.peer_addr;
if let Err(e) = connect::connect_to(node_info, connect_and_send) {
if let Err(e) = connect::connect_to(node_info, connect_and_send, None) {
debug!(
"Unable to connect to peer {} to be able to send message: {:?}",
peer_addr, e
Expand Down Expand Up @@ -106,11 +107,7 @@ pub fn write_to_peer(peer_addr: SocketAddr, msg: WireMsg) {
}

/// Write to the peer, given the QUIC connection to it
pub fn write_to_peer_connection(
peer_addr: SocketAddr,
conn: &quinn::Connection,
wire_msg: WireMsg,
) {
pub fn write_to_peer_connection(peer_addr: SocketAddr, conn: &QConn, wire_msg: WireMsg) {
let leaf = conn
.open_uni()
.map_err(move |e| {
Expand Down Expand Up @@ -266,7 +263,7 @@ pub fn handle_wire_msg(peer_addr: SocketAddr, wire_msg: WireMsg) {
// TODO: Improve by not taking `inform_tx` which is necessary right now to prevent double borrow
pub fn dispatch_wire_msg(
peer: Peer,
q_conn: &quinn::Connection,
q_conn: &QConn,
inform_tx: Option<Sender<SocketAddr>>,
event_tx: &Sender<Event>,
wire_msg: WireMsg,
Expand Down Expand Up @@ -372,7 +369,7 @@ fn handle_rx_cert(peer_addr: SocketAddr, peer_cert_der: Vec<u8>) {
});

if reverse_connect_to_peer {
if let Err(e) = connect::connect_to(node_info, None) {
if let Err(e) = connect::connect_to(node_info, None, None) {
debug!(
"ERROR: Could not reverse connect to peer {}: {}",
peer_addr, e
Expand Down Expand Up @@ -401,7 +398,7 @@ fn handle_user_msg(
}
}

fn handle_echo_req(peer_addr: SocketAddr, q_conn: &quinn::Connection) {
fn handle_echo_req(peer_addr: SocketAddr, q_conn: &QConn) {
let msg = WireMsg::EndpointEchoResp(peer_addr);
write_to_peer_connection(peer_addr, q_conn, msg);
}
Expand Down
90 changes: 73 additions & 17 deletions src/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,24 @@
// Software.

use crate::config::OurType;
use crate::context::{ctx_mut, Connection, FromPeer, ToPeer};
use crate::context::{ctx_mut, BootstrapGroupBuilder, Connection, FromPeer, ToPeer};
use crate::error::Error;
use crate::event::Event;
use crate::peer_config;
use crate::utils::QConn;
use crate::wire_msg::{Handshake, WireMsg};
use crate::{communicate, NodeInfo, Peer, R};
use std::mem;
use std::net::SocketAddr;
use tokio::prelude::Future;
use tokio::prelude::{Future, Stream};
use tokio::runtime::current_thread;

/// Connect to the given peer
pub fn connect_to(peer_info: NodeInfo, send_after_connect: Option<WireMsg>) -> R<()> {
pub fn connect_to(
peer_info: NodeInfo,
send_after_connect: Option<WireMsg>,
bootstrap_group_builder: Option<&BootstrapGroupBuilder>,
) -> R<()> {
let peer_addr = peer_info.peer_addr;

let peer_cfg = match peer_config::new_client_cfg(&peer_info.peer_cert_der) {
Expand All @@ -33,10 +38,16 @@ pub fn connect_to(peer_info: NodeInfo, send_after_connect: Option<WireMsg>) -> R

let r = ctx_mut(|c| {
let event_tx = c.event_tx.clone();
let conn = c
.connections
.entry(peer_addr)
.or_insert_with(|| Connection::new(peer_addr, event_tx));

let (terminator, rx) = tokio::sync::mpsc::channel::<()>(1);

let conn = c.connections.entry(peer_addr).or_insert_with(|| {
Connection::new(
peer_addr,
event_tx,
bootstrap_group_builder.map(|b| b.clone_group(peer_addr, terminator.clone())),
)
});

if conn.to_peer.is_no_connection() {
// TODO see if this can be the default from-peer for OurType::Client
Expand All @@ -46,25 +57,47 @@ pub fn connect_to(peer_info: NodeInfo, send_after_connect: Option<WireMsg>) -> R
}
conn.from_peer = FromPeer::NotNeeded;
}

// If we already had an incoming from someone we are trying to bootstrap off
if conn.bootstrap_group.is_none() {
conn.bootstrap_group =
bootstrap_group_builder.map(|b| b.clone_group(peer_addr, terminator.clone()));
}

let mut pending_sends: Vec<_> = Default::default();
if let Some(pending_send) = send_after_connect {
pending_sends.push(pending_send);
}
conn.to_peer = ToPeer::Initiated {
terminator: terminator.clone(),
peer_cert_der: peer_info.peer_cert_der,
pending_sends,
};
let peer_addr = peer_addr;
c.quic_ep()
.connect_with(peer_cfg, &peer_addr, "MaidSAFE.net")
.map_err(Error::from)
.map(move |new_client_conn_fut| {
let leaf = new_client_conn_fut.then(move |new_peer_conn_res| {
handle_new_connection_res(peer_addr, new_peer_conn_res);
Ok(())
});
.and_then(move |new_client_conn_fut| {
let terminator_leaf = rx
.map_err(move |_| {
handle_connect_err(peer_addr, &Error::ConnectionCancelled)
})
.for_each(move |_| {
handle_connect_err(peer_addr, &Error::ConnectionCancelled);
Err(())
});
let handle_new_connection_res_leaf =
new_client_conn_fut.then(move |new_peer_conn_res| {
handle_new_connection_res(peer_addr, new_peer_conn_res);
Ok::<_, ()>(())
});
let leaf = terminator_leaf
.select(handle_new_connection_res_leaf)
.then(|_| Ok(()));

current_thread::spawn(leaf);

Ok(())
})
} else {
Err(Error::DuplicateConnectionToPeer(peer_addr))
Expand All @@ -90,7 +123,9 @@ fn handle_new_connection_res(
>,
) {
let (conn_driver, q_conn, incoming_streams) = match new_peer_conn_res {
Ok((conn_driver, q_conn, incoming_streams)) => (conn_driver, q_conn, incoming_streams),
Ok((conn_driver, q_conn, incoming_streams)) => {
(conn_driver, QConn::from(q_conn), incoming_streams)
}
Err(e) => return handle_connect_err(peer_addr, &From::from(e)),
};
current_thread::spawn(
Expand Down Expand Up @@ -119,6 +154,7 @@ fn handle_new_connection_res(
ToPeer::Initiated {
ref mut peer_cert_der,
ref mut pending_sends,
..
} => (
mem::replace(peer_cert_der, Default::default()),
mem::replace(pending_sends, Default::default()),
Expand Down Expand Up @@ -157,8 +193,16 @@ fn handle_new_connection_res(
WireMsg::Handshake(Handshake::Client),
);

let peer = Peer::Node { node_info };
if let Err(e) = c.event_tx.send(Event::ConnectedTo { peer }) {
let event = if let Some(bootstrap_group) = conn.bootstrap_group.take() {
bootstrap_group.terminate_group(true);
Event::BootstrappedTo { node: node_info }
} else {
Event::ConnectedTo {
peer: node_info.into(),
}
};

if let Err(e) = c.event_tx.send(event) {
info!("Could not fire event: {:?}", e);
}

Expand All @@ -168,11 +212,23 @@ fn handle_new_connection_res(
ref mut pending_reads,
..
} => {
let peer = Peer::Node { node_info };
if let Err(e) = c.event_tx.send(Event::ConnectedTo { peer: peer.clone() }) {
let event = if let Some(bootstrap_group) = conn.bootstrap_group.take() {
bootstrap_group.terminate_group(true);
Event::BootstrappedTo {
node: node_info.clone(),
}
} else {
Event::ConnectedTo {
peer: node_info.clone().into(),
}
};

if let Err(e) = c.event_tx.send(event) {
info!("Could not fire event: {:?}", e);
}

let peer = Peer::Node { node_info };

for pending_read in pending_reads.drain(..) {
communicate::dispatch_wire_msg(
peer.clone(),
Expand Down