56 changes: 40 additions & 16 deletions src/connect.rs
Expand Up @@ -33,7 +33,7 @@ pub fn connect_to(
let r = ctx_mut(|c| {
let event_tx = c.event_tx.clone();

let (terminator, mut rx) = utils::connect_terminator();
let (terminator, mut terminator_rx) = utils::connect_terminator();

let conn = c.connections.entry(node_addr).or_insert_with(|| {
Connection::new(
Expand Down Expand Up @@ -77,7 +77,7 @@ pub fn connect_to(
let _ = tokio::spawn(async move {
select! {
// Terminator leaf
_ = rx.recv().fuse() => {
_ = terminator_rx.recv().fuse() => {
handle_connect_err(node_addr, &QuicP2pError::ConnectionCancelled);
},
// New connection
Expand All @@ -92,6 +92,26 @@ pub fn connect_to(

Ok(())
} else {
if let Some((pending_send, token)) = send_after_connect {
debug!("Duplicate connection. Will send pending messages.");
match conn.to_peer {
ToPeer::Established { ref q_conn } => {
communicate::write_to_peer_connection(
Peer::Node(node_addr),
q_conn,
pending_send,
token,
);
}
ToPeer::Initiated {
ref mut pending_sends,
..
} => {
pending_sends.push((pending_send, token));
}
_ => {}
}
}
Err(QuicP2pError::DuplicateConnectionToPeer {
peer_addr: node_addr,
})
Expand Down Expand Up @@ -170,17 +190,19 @@ fn handle_new_connection_res(
0,
);

let event = if let Some(bootstrap_group_ref) = conn.bootstrap_group_ref.take() {
if let Some(mut bootstrap_group_ref) = conn.bootstrap_group_ref.take() {
if let Err(e) =
bootstrap_group_ref.send(Event::BootstrappedTo { node: peer_addr })
{
info!("Could not fire event: {:?}", e);
}
terminate_bootstrap_group = Some(bootstrap_group_ref);
Event::BootstrappedTo { node: peer_addr }
} else {
Event::ConnectedTo {
if let Err(e) = c.event_tx.send(Event::ConnectedTo {
peer: Peer::Node(peer_addr),
}) {
info!("Could not fire event: {:?}", e);
}
};

if let Err(e) = c.event_tx.send(event) {
info!("Could not fire event: {:?}", e);
}

should_accept_incoming = true;
Expand All @@ -189,17 +211,19 @@ fn handle_new_connection_res(
ref mut pending_reads,
..
} => {
let event = if let Some(bootstrap_group_ref) = conn.bootstrap_group_ref.take() {
if let Some(mut bootstrap_group_ref) = conn.bootstrap_group_ref.take() {
if let Err(e) =
bootstrap_group_ref.send(Event::BootstrappedTo { node: peer_addr })
{
info!("Could not fire event: {:?}", e);
}
terminate_bootstrap_group = Some(bootstrap_group_ref);
Event::BootstrappedTo { node: peer_addr }
} else {
Event::ConnectedTo {
if let Err(e) = c.event_tx.send(Event::ConnectedTo {
peer: Peer::Node(peer_addr),
}) {
info!("Could not fire event: {:?}", e);
}
};

if let Err(e) = c.event_tx.send(event) {
info!("Could not fire event: {:?}", e);
}

for pending_read in pending_reads.drain(..) {
Expand Down
36 changes: 35 additions & 1 deletion src/connection/bootstrap_group.rs
Expand Up @@ -18,6 +18,7 @@ use crate::context::ctx_mut;
use crate::event::Event;
use crate::utils::ConnectTerminator;
use crate::EventSenders;
use crossbeam_channel as mpmc;
use log::info;
use std::cell::RefCell;
use std::collections::HashMap;
Expand All @@ -28,7 +29,7 @@ use std::rc::Rc;
/// Creator of a `BootstrapGroup`. Use this to obtain the reference to the undelying group.
///
/// Destroy the maker once all references of the group have been obtained to not hold the internal
/// references for longer than needed. The maker going out of scope is enough for it's destruction.
/// references for longer than needed. The maker going out of scope is enough for its destruction.
pub struct BootstrapGroupMaker {
group: Rc<RefCell<BootstrapGroup>>,
}
Expand Down Expand Up @@ -66,6 +67,34 @@ impl BootstrapGroupMaker {
group: self.group.clone(),
}
}

/// Notify this group that it's already been boostrapped.
pub fn set_already_bootstrapped(&mut self, peer_addr: SocketAddr) {
// Drop connections for the rest of the group.
let mut terminators = {
let mut group = self.group.borrow_mut();
group.is_bootstrap_successful_yet = true;
mem::take(&mut group.terminators)
};

ctx_mut(|c| {
for (peer_addr, mut terminator) in terminators.drain() {
let _ = terminator.try_send(());
let _conn = c.connections.remove(&peer_addr);
}
});

// Notify about successful bootstrap.
if let Err(e) = self
.group
.borrow_mut()
.event_tx
.node_tx
.send(Event::BootstrappedTo { node: peer_addr })
{
info!("Failed informing about bootstrap success: {:?}", e);
}
}
}

/// Reference to the underlying `BootstrapGroup`.
Expand All @@ -85,6 +114,7 @@ impl BootstrapGroupRef {
pub fn terminate_group(&self, is_due_to_success: bool) {
let mut terminators = {
let mut group = self.group.borrow_mut();

if is_due_to_success {
group.is_bootstrap_successful_yet = true;
}
Expand All @@ -107,6 +137,10 @@ impl BootstrapGroupRef {
pub fn is_bootstrap_successful_yet(&self) -> bool {
self.group.borrow().is_bootstrap_successful_yet
}

pub fn send(&mut self, event: Event) -> Result<(), mpmc::SendError<Event>> {
self.group.borrow_mut().event_tx.node_tx.send(event)
}
}

impl Drop for BootstrapGroupRef {
Expand Down
2 changes: 1 addition & 1 deletion src/connection/mod.rs
Expand Up @@ -41,7 +41,7 @@ pub struct Connection {
/// end of this connection.
pub we_contacted_peer: bool,
peer_addr: SocketAddr,
event_tx: EventSenders,
pub(crate) event_tx: EventSenders,
}

impl Connection {
Expand Down
21 changes: 14 additions & 7 deletions src/error.rs
Expand Up @@ -9,9 +9,8 @@

use err_derive::Error;

use std::io;
use std::net::SocketAddr;
use std::sync::mpsc;
use std::{io, sync::mpsc};

#[derive(Debug, Error)]
#[allow(missing_docs)]
Expand All @@ -34,7 +33,7 @@ pub enum QuicP2pError {
DuplicateConnectionToPeer { peer_addr: SocketAddr },
#[error(display = "Could not find enpoint server")]
NoEndpointEchoServerFound,
#[error(display = "Oneshot receiver ")]
#[error(display = "Oneshot receiver")]
OneShotRx(#[source] tokio::sync::oneshot::error::RecvError),
#[error(display = "TLS Error ")]
TLS(#[source] rustls::TLSError),
Expand All @@ -48,16 +47,24 @@ pub enum QuicP2pError {
OperationNotAllowed,
#[error(display = "Connection cancelled")]
ConnectionCancelled,
#[error(display = "Channel receive error ")]
#[error(display = "MPMC channel receive error")]
MultiChannelRecv(#[source] crossbeam_channel::RecvError),
#[error(display = "Channel receive error")]
ChannelRecv(#[source] mpsc::RecvError),
#[error(display = "Could not add certificate to PKI")]
WebPki,
#[error(display = "Invalid wire message.")]
InvalidWireMsgFlag,
#[error(display = "Stream write error ")]
#[error(display = "Stream write error")]
WriteError(#[source] quinn::WriteError),
#[error(display = "Read to end error ")]
#[error(display = "Read to end error")]
ReadToEndError(#[source] quinn::ReadToEndError),
#[error(display = "Could not add certificate ")]
#[error(display = "Could not add certificate")]
AddCertificateError(#[source] quinn::ParseError),
#[error(display = "Could not use IGD for automatic port forwarding")]
IgdAddPort(#[source] igd::AddAnyPortError),
#[error(display = "Could not find the gateway device for IGD")]
IgdSearch(#[source] igd::SearchError),
#[error(display = "IGD is not supported")]
IgdNotSupported,
}
18 changes: 17 additions & 1 deletion src/event_loop.rs
Expand Up @@ -8,8 +8,8 @@
// Software.

use log::{debug, warn};
use std::fmt;
use std::thread::{self, JoinHandle};
use std::{fmt, sync};
use tokio::sync::mpsc::{self, UnboundedSender};
use unwrap::unwrap;

Expand Down Expand Up @@ -105,6 +105,22 @@ impl EventLoop {
{
post(&mut self.tx, f)
}

/// Post messages to event loop and return a result sent through a channel
pub fn post_and_return<F, T>(&mut self, f: F) -> Result<T, sync::mpsc::RecvError>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (tx, rx) = sync::mpsc::channel();
self.post(move || {
let res = f();
if let Err(e) = tx.send(res) {
warn!("Error trying to send a result: {:?}", e);
}
});
rx.recv()
}
}

impl Drop for EventLoop {
Expand Down
116 changes: 116 additions & 0 deletions src/igd.rs
@@ -0,0 +1,116 @@
// Copyright 2020 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under the MIT license <LICENSE-MIT
// http://opensource.org/licenses/MIT> or the Modified BSD license <LICENSE-BSD
// https://opensource.org/licenses/BSD-3-Clause>, at your option. This file may not be copied,
// modified, or distributed except according to those terms. Please review the Licences for the
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

use crate::error::QuicP2pError;
use crate::utils::R;
use log::{debug, info, warn};
use std::net::{IpAddr, SocketAddrV4};
use std::time::Duration;
use tokio::time::{self, Instant};

/// Default duration of a UPnP lease, in seconds.
pub const DEFAULT_UPNP_LEASE_DURATION_SEC: u32 = 120;

/// Automatically forwards a port and setups a tokio task to renew it periodically.
pub async fn forward_port(local_port: u16, lease_duration: u32) -> R<SocketAddrV4> {
let igd_res = add_port(local_port, lease_duration).await;

if let Ok(ref ext_sock_addr) = igd_res {
// Start a tokio task to renew the lease periodically.
let ext_port = ext_sock_addr.port();
let lease_interval = Duration::from_secs(lease_duration.into());

let _ = tokio::spawn(async move {
let mut timer =
time::interval_at(Instant::now() + lease_interval.into(), lease_interval);

loop {
let _ = timer.tick().await;
debug!("Renewing IGD lease for port {}", local_port);

let renew_res = renew_port(local_port, ext_port, lease_duration).await;
match renew_res {
Ok(()) => {}
Err(e) => {
warn!("Failed to renew IGD lease: {} - {:?}", e, e);
}
}
}
});
}

igd_res
}

/// Attempts to map an external port to a local address.
///
/// `local_port` is the local listener's port that all requests will be redirected to.
/// An external port is chosen randomly and returned as a result.
///
/// `lease_duration` is the life time of a port mapping (in seconds). If it is 0, the
/// mapping will continue to exist as long as possible.
pub(crate) async fn add_port(local_port: u16, lease_duration: u32) -> R<SocketAddrV4> {
let gateway = igd::aio::search_gateway(Default::default()).await?;

debug!("Found IGD gateway: {:?}", gateway);

// Find our local IP address by connecting to the gateway and querying local socket address.
let gateway_conn = tokio::net::TcpStream::connect(gateway.addr).await?;
let local_sa = gateway_conn.local_addr()?;

if let IpAddr::V4(ip) = local_sa.ip() {
let local_sa = SocketAddrV4::new(ip, local_port);

let ext_addr = gateway
.get_any_address(
igd::PortMappingProtocol::UDP,
local_sa,
lease_duration,
"MaidSafe.net",
)
.await?;

debug!("Our external address is {:?}", ext_addr);

Ok(ext_addr)
} else {
info!("IPv6 for IGD is not supported");
Err(QuicP2pError::IgdNotSupported)
}
}

/// Renews the lease for a specified external port.
pub(crate) async fn renew_port(local_port: u16, ext_port: u16, lease_duration: u32) -> R<()> {
let gateway = igd::aio::search_gateway(Default::default()).await?;

// Find our local IP address by connecting to the gateway and querying local socket address.
let gateway_conn = tokio::net::TcpStream::connect(gateway.addr).await?;
let local_sa = gateway_conn.local_addr()?;

if let IpAddr::V4(ip) = local_sa.ip() {
let local_sa = SocketAddrV4::new(ip, local_port);

let _res = gateway
.add_port(
igd::PortMappingProtocol::UDP,
ext_port,
local_sa,
lease_duration,
"MaidSafe.net",
)
.await;

debug!("Successfully renewed the port mapping");

Ok(())
} else {
info!("IPv6 for IGD is not supported");
Err(QuicP2pError::IgdNotSupported)
}
}
339 changes: 190 additions & 149 deletions src/lib.rs

Large diffs are not rendered by default.

35 changes: 18 additions & 17 deletions src/listener.rs
Expand Up @@ -17,7 +17,7 @@ use crate::{
};
use futures::future::{self, TryFutureExt};
use futures::stream::StreamExt;
use log::{debug, info};
use log::{debug, info, trace};

/// Start listening
pub fn listen(incoming_connections: quinn::Incoming) {
Expand Down Expand Up @@ -50,6 +50,8 @@ fn handle_new_conn(

let peer_addr = q_conn.remote_address();

trace!("Incoming connection from peer: {}", peer_addr);

let state = ctx_mut(|c| {
let event_tx = c.event_tx.clone();
let conn = c
Expand All @@ -62,32 +64,31 @@ fn handle_new_conn(
pending_reads: Default::default(),
};

let bootstrap_group = if let ToPeer::Established { .. } = conn.to_peer {
let mut bootstrap_group = None;

if let ToPeer::Established { .. } = conn.to_peer {
// TODO come back to all the connected-to events and see if we are handling all
// cases
let event = if let Some(bootstrap_group_ref) = conn.bootstrap_group_ref.take() {
if let Some(mut bootstrap_group_ref) = conn.bootstrap_group_ref.take() {
if bootstrap_group_ref.is_bootstrap_successful_yet() {
return Action::HandleAlreadyBootstrapped;
}
bootstrap_group = Some(bootstrap_group_ref);
Event::BootstrappedTo { node: peer_addr }

if let Err(e) =
bootstrap_group_ref.send(Event::BootstrappedTo { node: peer_addr })
{
info!("ERROR in informing user about a new peer: {:?} - {}", e, e);
}

return Action::Continue(Some(bootstrap_group_ref));
} else {
Event::ConnectedTo {
if let Err(e) = c.event_tx.send(Event::ConnectedTo {
peer: Peer::Node(peer_addr),
}) {
info!("ERROR in informing user about a new peer: {:?} - {}", e, e);
}
};

if let Err(e) = c.event_tx.send(event) {
info!("ERROR in informing user about a new peer: {:?} - {}", e, e);
}

bootstrap_group
} else {
None
};
Action::Continue(bootstrap_group)

Action::Continue(None)
} else {
Action::HandleDuplicate(q_conn)
}
Expand Down
84 changes: 11 additions & 73 deletions src/test_utils.rs
Expand Up @@ -12,19 +12,16 @@ use crate::connection::{Connection, FromPeer, QConn, ToPeer};
use crate::context::ctx;
use crate::context::Context;
use crate::dirs::{Dirs, OverRide};
use crate::event::Event;
use crate::utils::{Token, R};
use crate::utils::{new_unbounded_channels, EventReceivers, Token, R};
use crate::wire_msg::WireMsg;
use crate::{communicate, Builder, EventSenders, Peer, QuicP2p};
use crossbeam_channel as mpmc;
use crate::{communicate, Peer, QuicP2p};
use futures::future::Future;
use rand::Rng;
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::PathBuf;
use std::sync::mpsc;
use std::{env, ops::Deref, time::Duration};
use std::{env, ops::Deref, sync::mpsc, time::Duration};
use unwrap::unwrap;

thread_local! {
Expand Down Expand Up @@ -172,77 +169,13 @@ fn tmp_rand_dir() -> PathBuf {
path
}

pub(crate) struct EventReceivers {
pub node_rx: mpmc::Receiver<Event>,
pub client_rx: mpmc::Receiver<Event>,
}

impl EventReceivers {
pub fn recv(&self) -> Result<Event, mpmc::RecvError> {
let mut sel = mpmc::Select::new();
let client_idx = sel.recv(&self.client_rx);
let node_idx = sel.recv(&self.node_rx);
let selected_operation = sel.ready();

if selected_operation == client_idx {
self.client_rx.recv()
} else if selected_operation == node_idx {
self.node_rx.recv()
} else {
panic!("invalid operation");
}
}

pub fn try_recv(&self) -> Result<Event, mpmc::TryRecvError> {
self.node_rx
.try_recv()
.or_else(|_| self.client_rx.try_recv())
}

pub fn iter(&self) -> IterEvent {
IterEvent { event_rx: &self }
}
}

pub struct IterEvent<'a> {
event_rx: &'a EventReceivers,
}

impl<'a> Iterator for IterEvent<'a> {
type Item = Event;

fn next(&mut self) -> Option<Self::Item> {
let mut sel = mpmc::Select::new();
let client_idx = sel.recv(&self.event_rx.client_rx);
let node_idx = sel.recv(&self.event_rx.node_rx);
let selected_operation = sel.ready();

let event = if selected_operation == client_idx {
self.event_rx.client_rx.recv()
} else if selected_operation == node_idx {
self.event_rx.node_rx.recv()
} else {
return None;
};

event.ok()
}
}

pub(crate) fn new_unbounded_channels() -> (EventSenders, EventReceivers) {
let (client_tx, client_rx) = mpmc::unbounded();
let (node_tx, node_rx) = mpmc::unbounded();
(
EventSenders { node_tx, client_tx },
EventReceivers { node_rx, client_rx },
)
}

/// Creates a new `QuicP2p` instance for testing.
pub(crate) fn new_random_qp2p(
is_addr_unspecified: bool,
contacts: HashSet<SocketAddr>,
) -> (QuicP2p, EventReceivers) {
let _ = env_logger::try_init();

let (tx, rx) = new_unbounded_channels();
let qp2p = {
let mut cfg = Config::with_default_cert();
Expand All @@ -251,7 +184,12 @@ pub(crate) fn new_random_qp2p(
if !is_addr_unspecified {
cfg.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
}
unwrap!(Builder::new(tx).with_config(cfg).build())
unwrap!(QuicP2p::with_config(
tx,
Some(cfg),
Default::default(),
false
))
};

(qp2p, rx)
Expand Down
89 changes: 88 additions & 1 deletion src/utils.rs
Expand Up @@ -7,14 +7,18 @@
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

use crate::{ctx_mut, dirs::Dirs, error::QuicP2pError, event::Event, peer::Peer};
#![allow(unused)]

use crate::{ctx_mut, dirs::Dirs, error::QuicP2pError, event::Event, peer::Peer, EventSenders};
use crossbeam_channel as mpmc;
use log::debug;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fs::File;
use std::io::{BufReader, BufWriter};
use std::net::SocketAddr;
use std::path::Path;
use std::time::Duration;

/// Result used by `QuicP2p`.
pub type R<T> = Result<T, QuicP2pError>;
Expand All @@ -28,6 +32,89 @@ pub fn connect_terminator() -> (ConnectTerminator, tokio::sync::mpsc::Receiver<(
tokio::sync::mpsc::channel(1)
}

pub(crate) struct EventReceivers {
pub node_rx: mpmc::Receiver<Event>,
pub client_rx: mpmc::Receiver<Event>,
}

impl EventReceivers {
pub fn recv(&self) -> Result<Event, mpmc::RecvError> {
let mut sel = mpmc::Select::new();
let client_idx = sel.recv(&self.client_rx);
let node_idx = sel.recv(&self.node_rx);
let selected_operation = sel.ready();

if selected_operation == client_idx {
self.client_rx.recv()
} else if selected_operation == node_idx {
self.node_rx.recv()
} else {
panic!("invalid operation");
}
}

pub fn recv_timeout(&self, timeout: Duration) -> Result<Event, QuicP2pError> {
let mut sel = mpmc::Select::new();
let client_idx = sel.recv(&self.client_rx);
let node_idx = sel.recv(&self.node_rx);
let selected_operation = sel.ready_timeout(timeout).map_err(|_| mpmc::RecvError)?;

if selected_operation == client_idx {
Ok(self.client_rx.recv()?)
} else if selected_operation == node_idx {
Ok(self.node_rx.recv()?)
} else {
panic!("invalid operation");
}
}

#[allow(unused)]
pub fn try_recv(&self) -> Result<Event, mpmc::TryRecvError> {
self.node_rx
.try_recv()
.or_else(|_| self.client_rx.try_recv())
}

#[allow(unused)]
pub fn iter(&self) -> IterEvent {
IterEvent { event_rx: &self }
}
}

pub struct IterEvent<'a> {
event_rx: &'a EventReceivers,
}

impl<'a> Iterator for IterEvent<'a> {
type Item = Event;

fn next(&mut self) -> Option<Self::Item> {
let mut sel = mpmc::Select::new();
let client_idx = sel.recv(&self.event_rx.client_rx);
let node_idx = sel.recv(&self.event_rx.node_rx);
let selected_operation = sel.ready();

let event = if selected_operation == client_idx {
self.event_rx.client_rx.recv()
} else if selected_operation == node_idx {
self.event_rx.node_rx.recv()
} else {
return None;
};

event.ok()
}
}

pub(crate) fn new_unbounded_channels() -> (EventSenders, EventReceivers) {
let (client_tx, client_rx) = mpmc::unbounded();
let (node_tx, node_rx) = mpmc::unbounded();
(
EventSenders { node_tx, client_tx },
EventReceivers { node_rx, client_rx },
)
}

/// Get the project directory
#[cfg(any(
all(
Expand Down
4 changes: 2 additions & 2 deletions src/wire_msg.rs
Expand Up @@ -14,7 +14,7 @@ use std::{fmt, net::SocketAddr};
use unwrap::unwrap;

/// Final type serialised and sent on the wire by QuicP2p
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum WireMsg {
Handshake(Handshake),
EndpointEchoReq,
Expand Down Expand Up @@ -70,7 +70,7 @@ impl fmt::Display for WireMsg {
/// passive connection from a peer will allow only incoming uni-directional streams from it.
///
/// Depending on the handshake we will categorise the peer and give this information to the user.
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub enum Handshake {
/// The connecting peer is a node.
Node,
Expand Down
15 changes: 9 additions & 6 deletions tests/quic_p2p.rs
@@ -1,5 +1,5 @@
use crossbeam_channel as mpmc;
use quic_p2p::{Builder, Config, Event, EventSenders, OurType, Peer, QuicP2p};
use quic_p2p::{Config, Event, EventSenders, OurType, Peer, QuicP2p};
use std::{
collections::HashSet,
net::{IpAddr, Ipv4Addr, SocketAddr},
Expand Down Expand Up @@ -70,17 +70,20 @@ fn test_peer_with_hcc(
our_type: OurType,
) -> (QuicP2p, EventReceivers) {
let (ev_tx, ev_rx) = new_unbounded_channels();
let builder = Builder::new(ev_tx)
.with_config(Config {
let qp2p = unwrap!(QuicP2p::with_config(
ev_tx,
Some(Config {
port: Some(0),
ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
hard_coded_contacts,
our_type,
..Default::default()
})
}),
// Make sure we start with an empty cache. Otherwise, we might get into unexpected state.
.with_bootstrap_nodes(Default::default(), true);
(unwrap!(builder.build()), ev_rx)
Default::default(),
true,
));
(qp2p, ev_rx)
}

#[test]
Expand Down