diff --git a/examples/echo_service.rs b/examples/echo_service.rs index d673ca3e..37973b3f 100644 --- a/examples/echo_service.rs +++ b/examples/echo_service.rs @@ -23,7 +23,7 @@ async fn main() -> Result<(), Error> { &bootstrap_nodes, false, )?; - let mut endpoint = qp2p.new_endpoint()?; + let endpoint = qp2p.new_endpoint()?; let socket_addr = endpoint.socket_addr().await?; println!("Process running at: {}", &socket_addr); if genesis { diff --git a/src/api.rs b/src/api.rs index f3b1d5c6..d8a8c3ee 100644 --- a/src/api.rs +++ b/src/api.rs @@ -71,7 +71,7 @@ pub struct QuicP2p { bootstrap_cache: BootstrapCache, endpoint_cfg: quinn::ServerConfig, client_cfg: quinn::ClientConfig, - upnp_lease_duration: u32, + qp2p_config: Config, } impl QuicP2p { @@ -157,6 +157,8 @@ impl QuicP2p { .clone() .map(|custom_dir| Dirs::Overide(OverRide::new(&custom_dir))); + let mut qp2p_config = cfg.clone(); + let mut bootstrap_cache = BootstrapCache::new(cfg.hard_coded_contacts, custom_dirs.as_ref())?; if use_bootstrap_cache { @@ -178,13 +180,19 @@ impl QuicP2p { .upnp_lease_duration .unwrap_or(DEFAULT_UPNP_LEASE_DURATION_SEC); + qp2p_config.ip = Some(ip); + qp2p_config.port = Some(port); + qp2p_config.keep_alive_interval_msec = Some(keep_alive_interval_msec); + qp2p_config.idle_timeout_msec = Some(idle_timeout_msec); + qp2p_config.upnp_lease_duration = Some(upnp_lease_duration); + Ok(Self { local_addr: SocketAddr::new(ip, port), allow_random_port, bootstrap_cache, endpoint_cfg, client_cfg, - upnp_lease_duration, + qp2p_config, }) } @@ -209,7 +217,7 @@ impl QuicP2p { /// config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST)); /// config.port = Some(3000); /// let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), Default::default(), true)?; - /// let mut endpoint = quic_p2p.new_endpoint()?; + /// let endpoint = quic_p2p.new_endpoint()?; /// let peer_addr = endpoint.socket_addr().await?; /// /// config.port = Some(3001); @@ -233,12 +241,12 @@ impl QuicP2p { // Attempt to connect to all nodes and return the first one to succeed let mut tasks = Vec::default(); for node_addr in bootstrap_nodes.iter().cloned() { + let qp2p_config = self.qp2p_config.clone(); let nodes = bootstrap_nodes.clone(); let endpoint_cfg = self.endpoint_cfg.clone(); let client_cfg = self.client_cfg.clone(); let local_addr = self.local_addr; let allow_random_port = self.allow_random_port; - let upnp_lease_duration = self.upnp_lease_duration; let task_handle = tokio::spawn(async move { new_connection_to( &node_addr, @@ -246,8 +254,8 @@ impl QuicP2p { client_cfg, local_addr, allow_random_port, - upnp_lease_duration, nodes, + qp2p_config, ) .await }); @@ -278,7 +286,7 @@ impl QuicP2p { /// let mut config = Config::default(); /// config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST)); /// let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), Default::default(), true)?; - /// let mut peer_1 = quic_p2p.new_endpoint()?; + /// let peer_1 = quic_p2p.new_endpoint()?; /// let peer1_addr = peer_1.socket_addr().await?; /// /// let (peer_2, connection) = quic_p2p.connect_to(&peer1_addr).await?; @@ -301,8 +309,8 @@ impl QuicP2p { self.client_cfg.clone(), self.local_addr, self.allow_random_port, - self.upnp_lease_duration, bootstrap_nodes, + self.qp2p_config.clone(), ) .await } @@ -349,8 +357,8 @@ impl QuicP2p { quinn_endpoint, quinn_incoming, self.client_cfg.clone(), - self.upnp_lease_duration, bootstrap_nodes, + self.qp2p_config.clone(), )?; Ok(endpoint) @@ -366,8 +374,8 @@ async fn new_connection_to( client_cfg: quinn::ClientConfig, local_addr: SocketAddr, allow_random_port: bool, - upnp_lease_duration: u32, bootstrap_nodes: Vec, + qp2p_config: Config, ) -> Result<(Endpoint, Connection)> { trace!("Attempting to connect to peer: {}", node_addr); @@ -379,8 +387,8 @@ async fn new_connection_to( quinn_endpoint, quinn_incoming, client_cfg, - upnp_lease_duration, bootstrap_nodes, + qp2p_config, )?; let connection = endpoint.connect_to(node_addr).await?; diff --git a/src/config.rs b/src/config.rs index f4328645..fac2ace7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -63,6 +63,9 @@ pub struct Config { /// Duration of a UPnP port mapping. #[structopt(long)] pub upnp_lease_duration: Option, + /// Specify if port forwarding via UPnP should be done or not + #[structopt(long)] + pub forward_port: bool, } impl Config { diff --git a/src/connections.rs b/src/connections.rs index 94f84528..1ff5bc01 100644 --- a/src/connections.rs +++ b/src/connections.rs @@ -48,7 +48,7 @@ impl Connection { /// let mut config = Config::default(); /// config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST)); /// let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), Default::default(), true)?; - /// let mut peer_1 = quic_p2p.new_endpoint()?; + /// let peer_1 = quic_p2p.new_endpoint()?; /// let peer1_addr = peer_1.socket_addr().await?; /// /// let (peer_2, connection) = quic_p2p.connect_to(&peer1_addr).await?; @@ -74,7 +74,7 @@ impl Connection { /// let mut config = Config::default(); /// config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST)); /// let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), Default::default(), true)?; - /// let mut peer_1 = quic_p2p.new_endpoint()?; + /// let peer_1 = quic_p2p.new_endpoint()?; /// let peer1_addr = peer_1.socket_addr().await?; /// /// let (peer_2, connection) = quic_p2p.connect_to(&peer1_addr).await?; diff --git a/src/endpoint.rs b/src/endpoint.rs index ec1fb269..e2d60d70 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -13,11 +13,13 @@ use super::wire_msg::WireMsg; use super::{ connections::{Connection, IncomingConnections}, error::Result, + Config, }; -use futures::{lock::Mutex, FutureExt}; +use futures::lock::Mutex; use log::trace; use log::{debug, info}; -use std::{net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc, time::Duration}; +use tokio::time::timeout; /// Host name of the Quic communication certificate used by peers // FIXME: make it configurable @@ -30,8 +32,8 @@ pub struct Endpoint { quic_endpoint: quinn::Endpoint, quic_incoming: Arc>, client_cfg: quinn::ClientConfig, - upnp_lease_duration: u32, bootstrap_nodes: Vec, + qp2p_config: Config, } impl Endpoint { @@ -39,8 +41,8 @@ impl Endpoint { quic_endpoint: quinn::Endpoint, quic_incoming: quinn::Incoming, client_cfg: quinn::ClientConfig, - upnp_lease_duration: u32, bootstrap_nodes: Vec, + qp2p_config: Config, ) -> Result { let local_addr = quic_endpoint.local_addr()?; Ok(Self { @@ -48,8 +50,8 @@ impl Endpoint { quic_endpoint, quic_incoming: Arc::new(Mutex::new(quic_incoming)), client_cfg, - upnp_lease_duration, bootstrap_nodes, + qp2p_config, }) } @@ -60,7 +62,7 @@ impl Endpoint { /// Returns the socket address of the endpoint pub async fn socket_addr(&self) -> Result { - if cfg!(test) { + if cfg!(test) || !self.qp2p_config.forward_port { self.local_addr().await } else { self.public_addr().await @@ -83,7 +85,17 @@ impl Endpoint { let mut addr = None; // Attempt to use IGD for port forwarding - match tokio::time::timeout(std::time::Duration::from_secs(30), forward_port(self.local_addr, self.upnp_lease_duration)).await { + match timeout( + Duration::from_secs(30), + forward_port( + self.local_addr, + self.qp2p_config.upnp_lease_duration.ok_or_else(|| { + Error::Unexpected("Missing UPnP config parameter".to_string()) + })?, + ), + ) + .await + { Ok(res) => { match res { Ok(public_sa) => { @@ -103,32 +115,28 @@ impl Endpoint { } // Try to contact an echo service - match tokio::time::timeout(std::time::Duration::from_secs(30), self.query_ip_echo_service()).await { - Ok(res) => { - match res { - Ok(echo_res) => match addr { - None => { - addr = Some(echo_res); - } - Some(address) => { - info!("Got response from echo service: {:?}, but IGD has already provided our external address: {:?}", echo_res, address); - } - }, - Err(err) => { - info!("Could not contact echo service: {} - {:?}", err, err); + match timeout(Duration::from_secs(30), self.query_ip_echo_service()).await { + Ok(res) => match res { + Ok(echo_res) => match addr { + None => { + addr = Some(echo_res); + } + Some(address) => { + info!("Got response from echo service: {:?}, but IGD has already provided our external address: {:?}", echo_res, address); } + }, + Err(err) => { + info!("Could not contact echo service: {} - {:?}", err, err); } }, - Err(e) => { - info!("Echo service timed out: {:?}", e) - } + Err(e) => info!("Echo service timed out: {:?}", e), } if let Some(socket_addr) = addr { Ok(socket_addr) } else { Err(Error::Unexpected( "No response from echo service".to_string(), - )) + )) } } diff --git a/src/tests/common.rs b/src/tests/common.rs index 8ba109d8..fc2b60ff 100644 --- a/src/tests/common.rs +++ b/src/tests/common.rs @@ -33,10 +33,10 @@ fn random_msg() -> Bytes { #[tokio::test] async fn successful_connection() -> Result<()> { let qp2p = new_qp2p(); - let mut peer1 = qp2p.new_endpoint()?; + let peer1 = qp2p.new_endpoint()?; let peer1_addr = peer1.socket_addr().await?; - let mut peer2 = qp2p.new_endpoint()?; + let peer2 = qp2p.new_endpoint()?; let _connection = peer2.connect_to(&peer1_addr).await?; let mut incoming_conn = peer1.listen()?; @@ -53,7 +53,7 @@ async fn successful_connection() -> Result<()> { #[tokio::test] async fn bi_directional_streams() -> Result<()> { let qp2p = new_qp2p(); - let mut peer1 = qp2p.new_endpoint()?; + let peer1 = qp2p.new_endpoint()?; let peer1_addr = peer1.socket_addr().await?; let peer2 = qp2p.new_endpoint()?; @@ -109,11 +109,11 @@ async fn bi_directional_streams() -> Result<()> { #[tokio::test] async fn uni_directional_streams() -> Result<()> { let qp2p = new_qp2p(); - let mut peer1 = qp2p.new_endpoint()?; + let peer1 = qp2p.new_endpoint()?; let peer1_addr = peer1.socket_addr().await?; let mut incoming_conn_peer1 = peer1.listen()?; - let mut peer2 = qp2p.new_endpoint()?; + let peer2 = qp2p.new_endpoint()?; let peer2_addr = peer2.socket_addr().await?; let mut incoming_conn_peer2 = peer2.listen()?; diff --git a/src/tests/echo_service.rs b/src/tests/echo_service.rs index caf3b2d2..17bf4072 100644 --- a/src/tests/echo_service.rs +++ b/src/tests/echo_service.rs @@ -13,7 +13,7 @@ async fn echo_service() -> Result<()> { false, )?; // Create Endpoint - let mut peer1 = qp2p.new_endpoint()?; + let peer1 = qp2p.new_endpoint()?; let peer1_addr = peer1.socket_addr().await?; // Listen for messages / connections at peer 1 @@ -29,7 +29,7 @@ async fn echo_service() -> Result<()> { // In parallel create another endpoint and send an EchoServiceReq let handle2 = tokio::spawn(async move { - let mut peer2 = qp2p.new_endpoint()?; + let peer2 = qp2p.new_endpoint()?; let socket_addr = peer2.socket_addr().await?; let connection = peer2.connect_to(&peer1_addr).await?; let (mut send_stream, mut recv_stream) = connection.open_bi_stream().await?;