diff --git a/src/endpoint.rs b/src/endpoint.rs index d90d4e53..a1a0129b 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -248,7 +248,7 @@ impl Endpoint { } #[cfg(not(feature = "no-igd"))] - if addr.is_none() && self.qp2p_config.forward_port { + if self.qp2p_config.forward_port { // Attempt to use IGD for port forwarding match timeout( Duration::from_secs(PORT_FORWARD_TIMEOUT), @@ -268,12 +268,10 @@ impl Endpoint { } Err(e) => { info!("IGD request failed: {} - {:?}", e, e); - return Err(Error::IgdNotSupported); } }, Err(e) => { info!("IGD request timeout: {:?}", e); - return Err(Error::IgdNotSupported); } } } @@ -380,6 +378,40 @@ impl Endpoint { Ok(()) } + /// Verify if an address is publicly reachable. This will attempt to create + /// a new connection and use it to exchange a message and verify that the node + /// can be reached. + pub async fn is_reachable(&self, peer_addr: &SocketAddr) -> Result<()> { + let new_connection = self.create_new_connection(peer_addr).await?; + let (mut send_stream, mut recv_stream) = new_connection.connection.open_bi().await?; + let message = WireMsg::EndpointEchoReq; + message.write_to_stream(&mut send_stream).await?; + + match timeout( + Duration::from_secs(ECHO_SERVICE_QUERY_TIMEOUT), + WireMsg::read_from_stream(&mut recv_stream), + ) + .await + { + Ok(Ok(WireMsg::EndpointEchoResp(_))) => Ok(()), + Ok(Ok(other)) => { + info!( + "Unexpected message type when verifying reachability: {}", + &other + ); + Ok(()) + } + Ok(Err(err)) => { + info!("Unable to contact peer: {:?}", err); + Err(err) + } + Err(err) => { + info!("Unable to contact peer: {:?}", err); + Err(Error::NoEchoServiceResponse) + } + } + } + /// Creates a fresh connection without looking at the connection pool and connection duplicator. pub(crate) async fn create_new_connection( &self, diff --git a/src/tests/common.rs b/src/tests/common.rs index 204aa7df..600cb94f 100644 --- a/src/tests/common.rs +++ b/src/tests/common.rs @@ -444,3 +444,18 @@ async fn connection_attempts_to_bootstrap_contacts_should_succeed() -> Result<() } Ok(()) } + +#[tokio::test] +async fn reachability() -> Result<()> { + let qp2p = new_qp2p()?; + + let (ep1, _, _, _) = qp2p.new_endpoint().await?; + let (ep2, _, _, _) = qp2p.new_endpoint().await?; + + if let Ok(()) = ep1.is_reachable(&"127.0.0.1:12345".parse()?).await { + anyhow!("Unexpected success"); + }; + let reachable_addr = ep2.socket_addr(); + ep1.is_reachable(&reachable_addr).await?; + Ok(()) +}