Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
fix(connectivity): use separate endpoint to test connectivity to new
Browse files Browse the repository at this point in the history
peers
  • Loading branch information
lionel-faber authored and S-Coyle committed Apr 14, 2021
1 parent a01bdc3 commit 26a2bcc
Showing 1 changed file with 23 additions and 5 deletions.
28 changes: 23 additions & 5 deletions src/routing/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use tokio::{sync::mpsc, task};
pub(crate) struct Comm {
_quic_p2p: QuicP2p,
endpoint: Endpoint,
// Additional endpoint used to check if peers are externally reachable
connectivity_endpoint: Endpoint,
// Sender for connection events. Kept here so we can clone it and pass it to the incoming
// messages handler every time we establish new connection. It's kept in an `Option` so we can
// take it out and drop it on `terminate` which together with all the incoming message handlers
Expand All @@ -32,10 +34,10 @@ pub(crate) struct Comm {

impl Comm {
pub async fn new(
transport_config: qp2p::Config,
mut transport_config: qp2p::Config,
event_tx: mpsc::Sender<ConnectionEvent>,
) -> Result<Self> {
let quic_p2p = QuicP2p::with_config(Some(transport_config), &[], true)?;
let quic_p2p = QuicP2p::with_config(Some(transport_config.clone()), &[], true)?;

// Don't bootstrap, just create an endpoint to listen to
// the incoming messages from other nodes.
Expand All @@ -44,6 +46,13 @@ impl Comm {
let (endpoint, _incoming_connections, incoming_messages, disconnections) =
quic_p2p.new_endpoint().await?;

transport_config.local_ip = Some(endpoint.local_addr().ip());
transport_config.local_port = Some(0);
transport_config.forward_port = false;

let qp2p = QuicP2p::with_config(Some(transport_config), &[], false)?;
let (connectivity_endpoint, _, _, _) = qp2p.new_endpoint().await?;

let _ = task::spawn(handle_incoming_messages(
incoming_messages,
event_tx.clone(),
Expand All @@ -57,21 +66,29 @@ impl Comm {
Ok(Self {
_quic_p2p: quic_p2p,
endpoint,
connectivity_endpoint,
event_tx: RwLock::new(Some(event_tx)),
})
}

pub async fn bootstrap(
transport_config: qp2p::Config,
mut transport_config: qp2p::Config,
event_tx: mpsc::Sender<ConnectionEvent>,
) -> Result<(Self, SocketAddr)> {
let quic_p2p = QuicP2p::with_config(Some(transport_config), &[], true)?;
let quic_p2p = QuicP2p::with_config(Some(transport_config.clone()), &[], true)?;

// Bootstrap to the network returning the connection to a node.
// We can use the returned channels to listen for incoming messages and disconnection events
let (endpoint, _incoming_connections, incoming_messages, disconnections, bootstrap_addr) =
quic_p2p.bootstrap().await?;

transport_config.local_ip = Some(endpoint.local_addr().ip());
transport_config.local_port = Some(0);
transport_config.forward_port = false;

let qp2p = QuicP2p::with_config(Some(transport_config), &[], false)?;
let (connectivity_endpoint, _, _, _) = qp2p.new_endpoint().await?;

let _ = task::spawn(handle_incoming_messages(
incoming_messages,
event_tx.clone(),
Expand All @@ -86,6 +103,7 @@ impl Comm {
Self {
_quic_p2p: quic_p2p,
endpoint,
connectivity_endpoint,
event_tx: RwLock::new(Some(event_tx)),
},
bootstrap_addr,
Expand Down Expand Up @@ -123,7 +141,7 @@ impl Comm {

/// Tests whether the peer is reachable.
pub async fn is_reachable(&self, peer: &SocketAddr) -> Result<(), SendError> {
self.endpoint
self.connectivity_endpoint
.is_reachable(peer)
.await
.map_err(|err| {
Expand Down

0 comments on commit 26a2bcc

Please sign in to comment.