From 8ab10b98c887cef1c44ab7be4d7cafa14dc31fa0 Mon Sep 17 00:00:00 2001 From: Lionel Faber Date: Mon, 12 Apr 2021 19:55:03 +0530 Subject: [PATCH 1/2] feat(api): add new api to check if endpoint is externally reachable --- src/endpoint.rs | 26 ++++++++++++++++++++++++++ src/tests/common.rs | 16 ++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/src/endpoint.rs b/src/endpoint.rs index d90d4e53..bffd74e6 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -380,6 +380,32 @@ impl Endpoint { Ok(()) } + /// Verify if an address is publicly rechable. This will attempt to create + /// a new connection and use it to exchange a message and verify that the node + /// can be reached + pub async fn is_reachable(&self, peer_addr: &SocketAddr) -> Result<()> { + let new_connection = self.create_new_connection(peer_addr).await?; + let (mut send_stream, mut recv_stream) = new_connection.connection.open_bi().await?; + let message = WireMsg::EndpointEchoReq; + message.write_to_stream(&mut send_stream).await?; + + match timeout(Duration::from_secs(ECHO_SERVICE_QUERY_TIMEOUT), WireMsg::read_from_stream(&mut recv_stream)).await { + Ok(Ok(WireMsg::EndpointEchoResp(_))) => Ok(()), + Ok(Ok(other)) => { + info!("Unexpected message type when verifying reachability: {}", &other); + Ok(()) + } + Ok(Err(err)) => { + info!("Unable to contact peer: {:?}", err); + Err(err) + } + Err(err) => { + info!("Unable to contact peer: {:?}", err); + Err(Error::NoEchoServiceResponse) + } + } + } + /// Creates a fresh connection without looking at the connection pool and connection duplicator. pub(crate) async fn create_new_connection( &self, diff --git a/src/tests/common.rs b/src/tests/common.rs index 204aa7df..6c81a29d 100644 --- a/src/tests/common.rs +++ b/src/tests/common.rs @@ -444,3 +444,19 @@ async fn connection_attempts_to_bootstrap_contacts_should_succeed() -> Result<() } Ok(()) } + +#[tokio::test] +async fn reachability() -> Result<()> { + let qp2p = new_qp2p()?; + + let (ep1, _, _, _) = qp2p.new_endpoint().await?; + let (ep2, _, _, _) = qp2p.new_endpoint().await?; + + if let Ok(()) = ep1.is_reachable(&"127.0.0.1:12345".parse()?).await { + anyhow!("Unexpected success"); + }; + let reachable_addr = ep2.socket_addr(); + ep1.is_reachable(&reachable_addr).await?; + Ok(()) + +} \ No newline at end of file From b4b60ed1a3ba9c7fed0bebd295d4efebbd0dc360 Mon Sep 17 00:00:00 2001 From: Lionel Faber Date: Tue, 13 Apr 2021 15:01:30 +0530 Subject: [PATCH 2/2] fix(igd): run igd even if echo service succeedes - also addresses some review comments --- src/endpoint.rs | 22 ++++++++++++++-------- src/tests/common.rs | 5 ++--- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/endpoint.rs b/src/endpoint.rs index bffd74e6..a1a0129b 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -248,7 +248,7 @@ impl Endpoint { } #[cfg(not(feature = "no-igd"))] - if addr.is_none() && self.qp2p_config.forward_port { + if self.qp2p_config.forward_port { // Attempt to use IGD for port forwarding match timeout( Duration::from_secs(PORT_FORWARD_TIMEOUT), @@ -268,12 +268,10 @@ impl Endpoint { } Err(e) => { info!("IGD request failed: {} - {:?}", e, e); - return Err(Error::IgdNotSupported); } }, Err(e) => { info!("IGD request timeout: {:?}", e); - return Err(Error::IgdNotSupported); } } } @@ -380,19 +378,27 @@ impl Endpoint { Ok(()) } - /// Verify if an address is publicly rechable. This will attempt to create + /// Verify if an address is publicly reachable. This will attempt to create /// a new connection and use it to exchange a message and verify that the node - /// can be reached + /// can be reached. pub async fn is_reachable(&self, peer_addr: &SocketAddr) -> Result<()> { let new_connection = self.create_new_connection(peer_addr).await?; let (mut send_stream, mut recv_stream) = new_connection.connection.open_bi().await?; let message = WireMsg::EndpointEchoReq; message.write_to_stream(&mut send_stream).await?; - - match timeout(Duration::from_secs(ECHO_SERVICE_QUERY_TIMEOUT), WireMsg::read_from_stream(&mut recv_stream)).await { + + match timeout( + Duration::from_secs(ECHO_SERVICE_QUERY_TIMEOUT), + WireMsg::read_from_stream(&mut recv_stream), + ) + .await + { Ok(Ok(WireMsg::EndpointEchoResp(_))) => Ok(()), Ok(Ok(other)) => { - info!("Unexpected message type when verifying reachability: {}", &other); + info!( + "Unexpected message type when verifying reachability: {}", + &other + ); Ok(()) } Ok(Err(err)) => { diff --git a/src/tests/common.rs b/src/tests/common.rs index 6c81a29d..600cb94f 100644 --- a/src/tests/common.rs +++ b/src/tests/common.rs @@ -457,6 +457,5 @@ async fn reachability() -> Result<()> { }; let reachable_addr = ep2.socket_addr(); ep1.is_reachable(&reachable_addr).await?; - Ok(()) - -} \ No newline at end of file + Ok(()) +}