From 7b78c3e36c0cb0b56653956414ceaa7565f3f101 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20RIBEAU?= Date: Wed, 27 Mar 2024 16:31:59 +0100 Subject: [PATCH] upstream upnp work --- examples/upnp/src/main.rs | 16 +- protocols/upnp/Cargo.toml | 3 + protocols/upnp/src/behaviour.rs | 592 +++++++++++++---------------- protocols/upnp/src/lib.rs | 4 +- protocols/upnp/src/mapping_list.rs | 417 ++++++++++++++++++++ protocols/upnp/src/tokio.rs | 108 +++--- 6 files changed, 757 insertions(+), 383 deletions(-) create mode 100644 protocols/upnp/src/mapping_list.rs diff --git a/examples/upnp/src/main.rs b/examples/upnp/src/main.rs index fd0764990d1..50bdae902b1 100644 --- a/examples/upnp/src/main.rs +++ b/examples/upnp/src/main.rs @@ -31,6 +31,10 @@ async fn main() -> Result<(), Box> { .with_env_filter(EnvFilter::from_default_env()) .try_init(); + let config = upnp::Config::new() + .with_mapping_description("rust-libp2p-upnp-example") + .with_mapping_duration(std::time::Duration::from_secs(30)); + let mut swarm = libp2p::SwarmBuilder::with_new_identity() .with_tokio() .with_tcp( @@ -38,12 +42,13 @@ async fn main() -> Result<(), Box> { noise::Config::new, yamux::Config::default, )? - .with_behaviour(|_| upnp::tokio::Behaviour::default())? + .with_behaviour(|_| upnp::tokio::Behaviour::new(config))? .build(); // Tell the swarm to listen on all interfaces and a random, OS-assigned // port. - swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; + swarm.listen_on("/ip4/0.0.0.0/tcp/5001".parse()?)?; + // swarm.listen_on("/ip4/192.168.1.33/tcp/0".parse()?)?; // Dial the peer identified by the multi-address given as the second // command-line argument, if any. @@ -56,8 +61,11 @@ async fn main() -> Result<(), Box> { loop { match swarm.select_next_some().await { SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {address:?}"), - SwarmEvent::Behaviour(upnp::Event::NewExternalAddr(addr)) => { - println!("New external address: {addr}"); + SwarmEvent::ExternalAddrConfirmed { address } => { + println!("New external address: {address}"); + } + SwarmEvent::ExternalAddrExpired { address } => { + println!("Expired external address: {address}"); } SwarmEvent::Behaviour(upnp::Event::GatewayNotFound) => { println!("Gateway does not support UPnP"); diff --git a/protocols/upnp/Cargo.toml b/protocols/upnp/Cargo.toml index 3119804bf60..636e54e9cca 100644 --- a/protocols/upnp/Cargo.toml +++ b/protocols/upnp/Cargo.toml @@ -20,6 +20,9 @@ tokio = { version = "1.36", default-features = false, features = ["rt"], optiona tracing = "0.1.37" void = "1.0.2" +[dev-dependencies] +tokio = { version = "1.34", default-features = false, features = ["macros", "rt"] } + [features] tokio = ["igd-next/aio_tokio", "dep:tokio"] diff --git a/protocols/upnp/src/behaviour.rs b/protocols/upnp/src/behaviour.rs index a94ef9526dd..fac789b2759 100644 --- a/protocols/upnp/src/behaviour.rs +++ b/protocols/upnp/src/behaviour.rs @@ -21,37 +21,107 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] use std::{ - borrow::Borrow, - collections::{HashMap, VecDeque}, + collections::VecDeque, error::Error, - hash::{Hash, Hasher}, - net::{self, IpAddr, SocketAddr, SocketAddrV4}, - ops::{Deref, DerefMut}, + net::IpAddr, pin::Pin, task::{Context, Poll}, time::Duration, }; -use crate::tokio::{is_addr_global, Gateway}; +use crate::{ + mapping_list::{Mapping, MappingList, MappingState}, + tokio::{is_addr_global, Gateway}, +}; use futures::{channel::oneshot, Future, StreamExt}; use futures_timer::Delay; -use igd_next::PortMappingProtocol; -use libp2p_core::{multiaddr, transport::ListenerId, Endpoint, Multiaddr}; +use libp2p_core::{Endpoint, Multiaddr}; use libp2p_swarm::{ derive_prelude::PeerId, dummy, ConnectionDenied, ConnectionId, ExpiredListenAddr, FromSwarm, NetworkBehaviour, NewListenAddr, ToSwarm, }; +use tracing::debug; + +/// The default duration in seconds of a port mapping on the gateway. +const MAPPING_DURATION: u64 = 3600; + +/// Interval to wait before performing a new mapping attempt. +const MAPPING_RETRY_INTERVAL: u64 = 900; + +/// Default interval to wait before performing a new gateway discovery. +const GW_DISCOVERY_INTERVAL: u64 = 3600; + +#[derive(Debug, Clone)] +pub struct Config { + /// The mapping description. + pub(crate) mapping_description: String, + + /// The duration of the port mapping. + /// Renew will occur at half this duration + pub(crate) mapping_duration: Duration, + + /// In case of failure, the duration between 2 mapping attempts + pub(crate) mapping_retry_interval: Duration, + + /// The duration between gateway discovery. + pub(crate) gw_discovery_interval: Duration, +} + +impl Config { + /// Creates a new [`Config`] with the following default settings: + /// + /// * [`Config::mapping_description`] "libp2p-mapping" + /// * [`Config::mapping_duration`] 1h + /// * [`Config::mapping_retry_interval`] 30mn + /// * [`Config::gw_discovery_interval`] 1h + /// + pub fn new() -> Self { + Self { + mapping_description: "rust-libp2p mapping".to_owned(), + mapping_duration: Duration::from_secs(MAPPING_DURATION), + mapping_retry_interval: Duration::from_secs(MAPPING_RETRY_INTERVAL), + gw_discovery_interval: Duration::from_secs(GW_DISCOVERY_INTERVAL), + } + } + + /// Sets the mapping description + pub fn with_mapping_description(mut self, description: impl Into) -> Self { + self.mapping_description = description.into(); + self + } -/// The duration in seconds of a port mapping on the gateway. -const MAPPING_DURATION: u32 = 3600; + /// Sets the mapping duration. + pub fn with_mapping_duration(mut self, duration: Duration) -> Self { + self.mapping_duration = duration; + self + } + + /// Sets the mapping retry interval. + pub fn with_mapping_retry_interval(mut self, interval: Duration) -> Self { + self.mapping_retry_interval = interval; + self + } + + /// Sets the gateway discovery interval. + pub fn with_gw_discovery_interval(mut self, interval: Duration) -> Self { + self.gw_discovery_interval = interval; + self + } +} -/// Renew the Mapping every half of `MAPPING_DURATION` to avoid the port being unmapped. -const MAPPING_TIMEOUT: u64 = MAPPING_DURATION as u64 / 2; +impl Default for Config { + fn default() -> Self { + Self::new() + } +} /// A [`Gateway`] Request. #[derive(Debug)] pub(crate) enum GatewayRequest { - AddMapping { mapping: Mapping, duration: u32 }, + AddMapping { + mapping: Mapping, + duration: Duration, + }, RemoveMapping(Mapping), } @@ -68,145 +138,32 @@ pub(crate) enum GatewayEvent { RemovalFailure(Mapping, Box), } -/// Mapping of a Protocol and Port on the gateway. -#[derive(Debug, Clone)] -pub(crate) struct Mapping { - pub(crate) listener_id: ListenerId, - pub(crate) protocol: PortMappingProtocol, - pub(crate) multiaddr: Multiaddr, - pub(crate) internal_addr: SocketAddr, -} - -impl Mapping { - /// Given the input gateway address, calculate the - /// open external `Multiaddr`. - fn external_addr(&self, gateway_addr: IpAddr) -> Multiaddr { - let addr = match gateway_addr { - net::IpAddr::V4(ip) => multiaddr::Protocol::Ip4(ip), - net::IpAddr::V6(ip) => multiaddr::Protocol::Ip6(ip), - }; - self.multiaddr - .replace(0, |_| Some(addr)) - .expect("multiaddr should be valid") - } -} - -impl Hash for Mapping { - fn hash(&self, state: &mut H) { - self.listener_id.hash(state); - } -} - -impl PartialEq for Mapping { - fn eq(&self, other: &Self) -> bool { - self.listener_id == other.listener_id - } -} - -impl Eq for Mapping {} - -impl Borrow for Mapping { - fn borrow(&self) -> &ListenerId { - &self.listener_id - } -} - -/// Current state of a [`Mapping`]. -#[derive(Debug)] -enum MappingState { - /// Port mapping is inactive, will be requested or re-requested on the next iteration. - Inactive, - /// Port mapping/removal has been requested on the gateway. - Pending, - /// Port mapping is active with the inner timeout. - Active(Delay), - /// Port mapping failed, we will try again. - Failed, -} - /// Current state of the UPnP [`Gateway`]. enum GatewayState { Searching(oneshot::Receiver>>), Available(Gateway), - GatewayNotFound, + GatewayNotFound(Delay), NonRoutableGateway(IpAddr), } /// The event produced by `Behaviour`. #[derive(Debug)] pub enum Event { - /// The multiaddress is reachable externally. - NewExternalAddr(Multiaddr), - /// The renewal of the multiaddress on the gateway failed. - ExpiredExternalAddr(Multiaddr), /// The IGD gateway was not found. GatewayNotFound, /// The Gateway is not exposed directly to the public network. NonRoutableGateway, } -/// A list of port mappings and its state. -#[derive(Debug, Default)] -struct MappingList(HashMap); - -impl Deref for MappingList { - type Target = HashMap; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for MappingList { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl MappingList { - /// Queue for renewal the current mapped ports on the `Gateway` that are expiring, - /// and try to activate the inactive. - fn renew(&mut self, gateway: &mut Gateway, cx: &mut Context<'_>) { - for (mapping, state) in self.iter_mut() { - match state { - MappingState::Inactive | MappingState::Failed => { - let duration = MAPPING_DURATION; - if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping { - mapping: mapping.clone(), - duration, - }) { - tracing::debug!( - multiaddress=%mapping.multiaddr, - "could not request port mapping for multiaddress on the gateway: {}", - err - ); - } - *state = MappingState::Pending; - } - MappingState::Active(timeout) => { - if Pin::new(timeout).poll(cx).is_ready() { - let duration = MAPPING_DURATION; - if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping { - mapping: mapping.clone(), - duration, - }) { - tracing::debug!( - multiaddress=%mapping.multiaddr, - "could not request port mapping for multiaddress on the gateway: {}", - err - ); - } - } - } - MappingState::Pending => {} - } - } - } -} +type GatewaySearchFunctionResult = oneshot::Receiver>>; +type GatewaySearchFunction = fn(String) -> GatewaySearchFunctionResult; /// A [`NetworkBehaviour`] for UPnP port mapping. Automatically tries to map the external port /// to an internal address on the gateway on a [`FromSwarm::NewListenAddr`]. pub struct Behaviour { + /// Configuration + config: Config, + /// UPnP interface state. state: GatewayState, @@ -215,18 +172,36 @@ pub struct Behaviour { /// Pending behaviour events to be emitted. pending_events: VecDeque, + + gateway_search_function: GatewaySearchFunction, } -impl Default for Behaviour { - fn default() -> Self { +impl Behaviour { + pub fn new(config: Config) -> Self { + Self::new_with_gateway_search_function(config, crate::tokio::search_gateway) + } + + fn new_with_gateway_search_function( + config: Config, + gateway_search_function: GatewaySearchFunction, + ) -> Self { + let description = config.mapping_description.clone(); Self { - state: GatewayState::Searching(crate::tokio::search_gateway()), + config, + state: GatewayState::Searching(gateway_search_function(description)), mappings: Default::default(), pending_events: VecDeque::new(), + gateway_search_function, } } } +impl Default for Behaviour { + fn default() -> Self { + Self::new(Config::new()) + } +} + impl NetworkBehaviour for Behaviour { type ConnectionHandler = dummy::ConnectionHandler; @@ -255,26 +230,18 @@ impl NetworkBehaviour for Behaviour { fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::NewListenAddr(NewListenAddr { - listener_id, + listener_id: _, addr: multiaddr, }) => { - let (addr, protocol) = match multiaddr_to_socketaddr_protocol(multiaddr.clone()) { - Ok(addr_port) => addr_port, - Err(()) => { - tracing::debug!("multiaddress not supported for UPnP {multiaddr}"); - return; - } + let Ok(mapping) = Mapping::try_from(multiaddr) else { + return; }; - if let Some((mapping, _state)) = self - .mappings - .iter() - .find(|(mapping, _state)| mapping.internal_addr.port() == addr.port()) - { - tracing::debug!( - multiaddress=%multiaddr, - mapped_multiaddress=%mapping.multiaddr, - "port from multiaddress is already being mapped" + if let Some((existing_mapping, _state)) = self.mappings.get_key_value(&mapping) { + debug!( + %multiaddr, + mapped_multiaddress=%existing_mapping.multiaddr, + "multiaddr is already being mapped" ); return; } @@ -283,67 +250,55 @@ impl NetworkBehaviour for Behaviour { GatewayState::Searching(_) => { // As the gateway is not yet available we add the mapping with `MappingState::Inactive` // so that when and if it becomes available we map it. - self.mappings.insert( - Mapping { - listener_id, - protocol, - internal_addr: addr, - multiaddr: multiaddr.clone(), - }, - MappingState::Inactive, - ); + self.mappings.insert(mapping, MappingState::Inactive); } GatewayState::Available(ref mut gateway) => { - let mapping = Mapping { - listener_id, - protocol, - internal_addr: addr, - multiaddr: multiaddr.clone(), - }; - - let duration = MAPPING_DURATION; - if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping { + match gateway.sender.try_send(GatewayRequest::AddMapping { mapping: mapping.clone(), - duration, + duration: self.config.mapping_duration, }) { - tracing::debug!( - multiaddress=%mapping.multiaddr, - "could not request port mapping for multiaddress on the gateway: {}", - err - ); - } - - self.mappings.insert(mapping, MappingState::Pending); + Ok(_) => self.mappings.insert(mapping, MappingState::Pending), + Err(error) => { + debug!( + multiaddr=%mapping.multiaddr, + "could not request port mapping for multiaddr on the gateway: {error:?}", + ); + self.mappings.insert(mapping, MappingState::Inactive) + } + }; } - GatewayState::GatewayNotFound => { - tracing::debug!( - multiaddres=%multiaddr, - "network gateway not found, UPnP port mapping of multiaddres discarded" + GatewayState::GatewayNotFound(_) => { + debug!( + %multiaddr, + "network gateway not found, UPnP port mapping of multiaddr discarded" ); } GatewayState::NonRoutableGateway(addr) => { - tracing::debug!( - multiaddress=%multiaddr, + debug!( + %multiaddr, network_gateway_ip=%addr, - "the network gateway is not exposed to the public network. / - UPnP port mapping of multiaddress discarded" + "the network gateway is not exposed to the public network. UPnP port mapping of multiaddr discarded" ); } }; } FromSwarm::ExpiredListenAddr(ExpiredListenAddr { - listener_id, - addr: _addr, + listener_id: _, + addr, }) => { - if let GatewayState::Available(ref mut gateway) = &mut self.state { - if let Some((mapping, _state)) = self.mappings.remove_entry(&listener_id) { + let Ok(mapping) = Mapping::try_from(addr) else { + return; + }; + + if let Some((mapping, _state)) = self.mappings.remove_entry(&mapping) { + if let GatewayState::Available(ref mut gateway) = &mut self.state { if let Err(err) = gateway .sender .try_send(GatewayRequest::RemoveMapping(mapping.clone())) { - tracing::debug!( - multiaddress=%mapping.multiaddr, - "could not request port removal for multiaddress on the gateway: {}", + debug!( + multiaddr=%mapping.multiaddr, + "could not request port removal for multiaddr on the gateway: {}", err ); } @@ -377,171 +332,142 @@ impl NetworkBehaviour for Behaviour { // Loop through the gateway state so that if it changes from `Searching` to `Available` // we poll the pending mapping requests. loop { - match self.state { - GatewayState::Searching(ref mut fut) => match Pin::new(fut).poll(cx) { + match &mut self.state { + GatewayState::Searching(fut) => match Pin::new(fut).poll(cx) { Poll::Ready(result) => { match result.expect("sender shouldn't have been dropped") { - Ok(gateway) => { - if !is_addr_global(gateway.external_addr) { - self.state = - GatewayState::NonRoutableGateway(gateway.external_addr); - tracing::debug!( - gateway_address=%gateway.external_addr, - "the gateway is not routable" - ); + Ok(gw) => { + if !is_addr_global(gw.external_addr) { + self.state = GatewayState::NonRoutableGateway(gw.external_addr); + debug!(gw_addr = %gw.internal_addr, external_gw_addr = %gw.external_addr, "Found unroutable gateway"); return Poll::Ready(ToSwarm::GenerateEvent( Event::NonRoutableGateway, )); } - self.state = GatewayState::Available(gateway); + debug!(gw_addr = %gw.internal_addr, external_gw_addr = %gw.external_addr, "Found gateway"); + self.state = GatewayState::Available(gw); } - Err(err) => { - tracing::debug!("could not find gateway: {err}"); - self.state = GatewayState::GatewayNotFound; + Err(error) => { + debug!( + ?error, + "Failed to find gateway - Scheduling next gateway search in {}s", + self.config.gw_discovery_interval.as_secs() + ); + self.state = GatewayState::GatewayNotFound(Delay::new( + self.config.gw_discovery_interval, + )); return Poll::Ready(ToSwarm::GenerateEvent(Event::GatewayNotFound)); } } } Poll::Pending => return Poll::Pending, }, - GatewayState::Available(ref mut gateway) => { + GatewayState::Available(gateway) => { // Poll pending mapping requests. - if let Poll::Ready(Some(result)) = gateway.receiver.poll_next_unpin(cx) { - match result { - GatewayEvent::Mapped(mapping) => { - let new_state = MappingState::Active(Delay::new( - Duration::from_secs(MAPPING_TIMEOUT), - )); - - match self - .mappings - .insert(mapping.clone(), new_state) - .expect("mapping should exist") - { - MappingState::Pending => { - let external_multiaddr = - mapping.external_addr(gateway.external_addr); - self.pending_events.push_back(Event::NewExternalAddr( - external_multiaddr.clone(), - )); - tracing::debug!( - address=%mapping.internal_addr, - protocol=%mapping.protocol, - "successfully mapped UPnP for protocol" - ); - return Poll::Ready(ToSwarm::ExternalAddrConfirmed( - external_multiaddr, - )); - } - MappingState::Active(_) => { - tracing::debug!( - address=%mapping.internal_addr, - protocol=%mapping.protocol, - "successfully renewed UPnP mapping for protocol" - ); - } - _ => unreachable!(), - } - } - GatewayEvent::MapFailure(mapping, err) => { - match self - .mappings - .insert(mapping.clone(), MappingState::Failed) - .expect("mapping should exist") - { - MappingState::Active(_) => { - tracing::debug!( - address=%mapping.internal_addr, - protocol=%mapping.protocol, - "failed to remap UPnP mapped for protocol: {err}" - ); - let external_multiaddr = - mapping.external_addr(gateway.external_addr); - self.pending_events.push_back(Event::ExpiredExternalAddr( - external_multiaddr.clone(), - )); - return Poll::Ready(ToSwarm::ExternalAddrExpired( - external_multiaddr, - )); - } - MappingState::Pending => { - tracing::debug!( - address=%mapping.internal_addr, - protocol=%mapping.protocol, - "failed to map UPnP mapped for protocol: {err}" - ); - } - _ => { - unreachable!() - } - } - } - GatewayEvent::Removed(mapping) => { - tracing::debug!( - address=%mapping.internal_addr, - protocol=%mapping.protocol, - "successfully removed UPnP mapping for protocol" - ); - self.mappings - .remove(&mapping) - .expect("mapping should exist"); - } - GatewayEvent::RemovalFailure(mapping, err) => { - tracing::debug!( - address=%mapping.internal_addr, - protocol=%mapping.protocol, - "could not remove UPnP mapping for protocol: {err}" - ); - if let Err(err) = gateway - .sender - .try_send(GatewayRequest::RemoveMapping(mapping.clone())) - { - tracing::debug!( - multiaddress=%mapping.multiaddr, - "could not request port removal for multiaddress on the gateway: {}", - err - ); - } - } + let gateway_receiver_poll_result = gateway.receiver.poll_next_unpin(cx); + if let Poll::Ready(Some(event)) = gateway_receiver_poll_result { + if let Some(to_swarm) = self.mappings.handle_gateway_event( + event, + &self.config, + gateway.external_addr, + ) { + return Poll::Ready(to_swarm); } + continue; } // Renew expired and request inactive mappings. - self.mappings.renew(gateway, cx); + for mapping in self.mappings.renew(cx) { + if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping { + mapping: mapping.clone(), + duration: self.config.mapping_duration, + }) { + debug!( + multiaddr=%mapping.multiaddr, + "could not request port mapping for multiaddr on the gateway: {}", + err + ); + } + } return Poll::Pending; } + GatewayState::GatewayNotFound(timer) => match Pin::new(timer).poll(cx) { + Poll::Ready(_) => { + self.state = GatewayState::Searching((self.gateway_search_function)( + self.config.mapping_description.clone(), + )); + } + Poll::Pending => return Poll::Pending, + }, _ => return Poll::Pending, } } } } -/// Extracts a [`SocketAddrV4`] and [`PortMappingProtocol`] from a given [`Multiaddr`]. -/// -/// Fails if the given [`Multiaddr`] does not begin with an IP -/// protocol encapsulating a TCP or UDP port. -fn multiaddr_to_socketaddr_protocol( - addr: Multiaddr, -) -> Result<(SocketAddr, PortMappingProtocol), ()> { - let mut iter = addr.into_iter(); - match iter.next() { - // Idg only supports Ipv4. - Some(multiaddr::Protocol::Ip4(ipv4)) if ipv4.is_private() => match iter.next() { - Some(multiaddr::Protocol::Tcp(port)) => { - return Ok(( - SocketAddr::V4(SocketAddrV4::new(ipv4, port)), - PortMappingProtocol::TCP, - )); - } - Some(multiaddr::Protocol::Udp(port)) => { - return Ok(( - SocketAddr::V4(SocketAddrV4::new(ipv4, port)), - PortMappingProtocol::UDP, - )); - } - _ => {} - }, - _ => {} +#[cfg(test)] +mod tests { + use std::{ + sync::atomic::{AtomicUsize, Ordering}, + time::Duration, + }; + + use super::*; + + #[cfg(feature = "tokio")] + #[tokio::test] + async fn being_called_back_after_failed_gateway_discovery() { + const FLAKES_AVOIDANCE_MS: u64 = 10; + const SEARCH_DELAY_MS: u64 = 10; + const GW_DISCOVERY_INTERVAL_MS: u64 = 2000; + + static CALL_COUNT: AtomicUsize = AtomicUsize::new(0); + fn search_function(_description: String) -> GatewaySearchFunctionResult { + let (search_result_sender, search_result_receiver) = oneshot::channel(); + + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(SEARCH_DELAY_MS)).await; + CALL_COUNT.fetch_add(1, Ordering::SeqCst); + search_result_sender.send(Err("Failed".into())).unwrap(); + }); + + search_result_receiver + } + assert_eq!(0, CALL_COUNT.load(Ordering::SeqCst)); + + let config = Config::new() + .with_gw_discovery_interval(Duration::from_millis(GW_DISCOVERY_INTERVAL_MS)); + let mut behaviour = Behaviour::new_with_gateway_search_function(config, search_function); + + match tokio::time::timeout( + Duration::from_millis(SEARCH_DELAY_MS + FLAKES_AVOIDANCE_MS), + std::future::poll_fn(|cx| behaviour.poll(cx)), + ) + .await + { + Ok(to_swarm) => assert!(matches!( + to_swarm, + ToSwarm::GenerateEvent(Event::GatewayNotFound) + )), + Err(_) => assert!(false, "Search function should be invoked in less than 10ms"), + }; + assert_eq!(1, CALL_COUNT.load(Ordering::SeqCst)); + + match tokio::time::timeout( + Duration::from_millis(GW_DISCOVERY_INTERVAL_MS + SEARCH_DELAY_MS + SEARCH_DELAY_MS), + std::future::poll_fn(|cx| behaviour.poll(cx)), + ) + .await + { + Ok(to_swarm) => assert!(matches!( + to_swarm, + ToSwarm::GenerateEvent(Event::GatewayNotFound) + )), + Err(_) => assert!( + false, + "Search function should be called back in less than 2s" + ), + }; + assert_eq!(2, CALL_COUNT.load(Ordering::SeqCst)); } - Err(()) } diff --git a/protocols/upnp/src/lib.rs b/protocols/upnp/src/lib.rs index 8a74d7e8f63..d3e3c8f7f20 100644 --- a/protocols/upnp/src/lib.rs +++ b/protocols/upnp/src/lib.rs @@ -31,7 +31,9 @@ #[cfg(feature = "tokio")] mod behaviour; #[cfg(feature = "tokio")] +mod mapping_list; +#[cfg(feature = "tokio")] pub mod tokio; #[cfg(feature = "tokio")] -pub use behaviour::Event; +pub use behaviour::{Config, Event}; diff --git a/protocols/upnp/src/mapping_list.rs b/protocols/upnp/src/mapping_list.rs new file mode 100644 index 00000000000..98f71230546 --- /dev/null +++ b/protocols/upnp/src/mapping_list.rs @@ -0,0 +1,417 @@ +use std::{ + collections::HashMap, + hash::{Hash, Hasher}, + net::{IpAddr, SocketAddr, SocketAddrV4}, + ops::{Deref, DerefMut}, + pin::Pin, + task::Context, +}; + +use crate::behaviour::GatewayEvent; +use futures::Future; +use futures_timer::Delay; +use igd_next::PortMappingProtocol; +use libp2p_core::{multiaddr, Multiaddr}; +use libp2p_swarm::ToSwarm; +use tracing::debug; +use void::Void; + +use crate::behaviour::{Config, Event}; + +/// Mapping of a Protocol and Port on the gateway. +#[derive(Debug, Clone)] +pub(crate) struct Mapping { + pub(crate) protocol: PortMappingProtocol, + pub(crate) internal_addr: SocketAddr, + pub(crate) multiaddr: Multiaddr, +} + +impl Mapping { + /// Given the input gateway address, calculate the + /// open external `Multiaddr`. + fn external_addr(&self, gateway_addr: IpAddr) -> Multiaddr { + let addr = multiaddr::Protocol::from(gateway_addr); + self.multiaddr + .replace(0, |_| Some(addr)) + .expect("multiaddr should be valid") + } +} + +impl Hash for Mapping { + fn hash(&self, state: &mut H) { + std::mem::discriminant(&self.protocol).hash(state); + self.internal_addr.hash(state); + } +} + +impl Eq for Mapping {} +impl PartialEq for Mapping { + fn eq(&self, other: &Self) -> bool { + self.protocol == other.protocol && self.internal_addr == other.internal_addr + } +} + +impl TryFrom<&Multiaddr> for Mapping { + type Error = (); + + fn try_from(multiaddr: &Multiaddr) -> Result { + let (internal_addr, protocol) = match multiaddr_to_socketaddr_protocol(multiaddr) { + Ok(addr_port) => addr_port, + Err(()) => { + debug!("multiaddr not supported for UPnP {multiaddr}"); + return Err(()); + } + }; + + Ok(Mapping { + protocol, + internal_addr, + multiaddr: multiaddr.clone(), + }) + } +} + +/// Current state of a [`Mapping`]. +#[derive(Debug)] +pub(crate) enum MappingState { + /// Port mapping is inactive, will be requested or re-requested on the next iteration. + Inactive, + /// Port mapping/removal has been requested on the gateway. + Pending, + /// Port mapping is active with the inner timeout or none if renewal is already in progress. + Active(Option), + /// Port mapping failed, we will try again. + Failed(Delay), +} + +/// A list of port mappings and its state. +#[derive(Debug, Default)] +pub(crate) struct MappingList(HashMap); + +impl Deref for MappingList { + type Target = HashMap; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for MappingList { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl MappingList { + /// Queue for renewal the current mapped ports on the `Gateway` that are expiring, + /// and try to activate the inactive. + pub(crate) fn renew(&mut self, cx: &mut Context<'_>) -> Vec { + let mut mappings_to_renew = Vec::new(); + + for (mapping, state) in self.iter_mut() { + let request_mapping = match state { + MappingState::Inactive => { + *state = MappingState::Pending; + true + } + MappingState::Failed(timer) => { + let ready_to_renew = Pin::new(timer).poll(cx).is_ready(); + if ready_to_renew { + *state = MappingState::Pending; + } + ready_to_renew + } + MappingState::Active(Some(timeout)) => { + let ready_to_renew = Pin::new(timeout).poll(cx).is_ready(); + if ready_to_renew { + *state = MappingState::Active(None); + } + ready_to_renew + } + MappingState::Active(None) | MappingState::Pending => false, + }; + if request_mapping { + mappings_to_renew.push(mapping.clone()); + } + } + + mappings_to_renew + } + + pub(crate) fn handle_gateway_event( + &mut self, + event: GatewayEvent, + config: &Config, + gateway_external_addr: IpAddr, + ) -> Option> { + match event { + GatewayEvent::Mapped(mapping) => { + if let Some(MappingState::Pending) = self.insert( + mapping.clone(), + MappingState::Active(Some(Delay::new(config.mapping_duration / 2))), + ) { + debug!(?mapping, "UPnP mapping activated successfully"); + let external_multiaddr = mapping.external_addr(gateway_external_addr); + return Some(ToSwarm::ExternalAddrConfirmed(external_multiaddr)); + } else { + debug!(?mapping, "UPnP mapping renewed successfully"); + } + } + GatewayEvent::MapFailure(mapping, err) => { + if let Some(MappingState::Active(_)) = self.insert( + mapping.clone(), + MappingState::Failed(Delay::new(config.mapping_retry_interval)), + ) { + debug!(?mapping, "UPnP mapping failed to be renewed: {err}"); + let external_multiaddr = mapping.external_addr(gateway_external_addr); + return Some(ToSwarm::ExternalAddrExpired(external_multiaddr)); + } else { + debug!(?mapping, "UPnP mapping failed to be activated: {err}"); + } + } + GatewayEvent::Removed(mapping) => { + debug!(?mapping, "UPnP mapping removed successfully"); + self.remove(&mapping); + } + GatewayEvent::RemovalFailure(mapping, err) => { + debug!(?mapping, "UPnP mapping failed to be removed: {err}"); + // Do not trigger a removal to avoid infinite loop on RemovalFailure. + // Expiration should remove it anyway. + self.remove(&mapping); + } + }; + + None + } +} + +/// Extracts a [`SocketAddrV4`] and [`PortMappingProtocol`] from a given [`Multiaddr`]. +/// +/// Fails if the given [`Multiaddr`] does not begin with an IP +/// protocol encapsulating a TCP or UDP port. +fn multiaddr_to_socketaddr_protocol( + addr: &Multiaddr, +) -> Result<(SocketAddr, PortMappingProtocol), ()> { + let mut iter = addr.into_iter(); + match iter.next() { + // Idg only supports Ipv4. + Some(multiaddr::Protocol::Ip4(ipv4)) if ipv4.is_private() => match iter.next() { + Some(multiaddr::Protocol::Tcp(port)) => { + return Ok(( + SocketAddr::V4(SocketAddrV4::new(ipv4, port)), + PortMappingProtocol::TCP, + )); + } + Some(multiaddr::Protocol::Udp(port)) => { + return Ok(( + SocketAddr::V4(SocketAddrV4::new(ipv4, port)), + PortMappingProtocol::UDP, + )); + } + _ => {} + }, + _ => {} + } + Err(()) +} + +#[cfg(test)] +mod tests { + use std::task::Poll; + use std::time::Duration; + + use super::*; + + const FLAKES_AVOIDANCE_MS: u64 = 10; + + fn fake_mapping_1() -> Mapping { + Mapping { + protocol: PortMappingProtocol::TCP, + internal_addr: "192.168.1.2:1234".parse().unwrap(), + multiaddr: "/ip4/192.168.1.2/tcp/1234".parse().unwrap(), + } + } + + #[cfg(feature = "tokio")] + #[tokio::test] + async fn renew_inactive() { + let mut mapping_list = MappingList::default(); + mapping_list.insert(fake_mapping_1(), MappingState::Inactive); + let to_renew = std::future::poll_fn(|cx| Poll::Ready(mapping_list.renew(cx))).await; + assert_eq!(to_renew, vec![fake_mapping_1()]); + assert!(matches!( + mapping_list.get(&fake_mapping_1()), + Some(MappingState::Pending) + )); + + // don't renew while renewing is already in progress + let to_renew = std::future::poll_fn(|cx| Poll::Ready(mapping_list.renew(cx))).await; + assert_eq!(to_renew, vec![]); + } + + #[cfg(feature = "tokio")] + #[tokio::test] + async fn renew_pending() { + let mut mapping_list = MappingList::default(); + mapping_list.insert(fake_mapping_1(), MappingState::Pending); + let to_renew = std::future::poll_fn(|cx| Poll::Ready(mapping_list.renew(cx))).await; + assert_eq!(to_renew, vec![]); + assert!(matches!( + mapping_list.get(&fake_mapping_1()), + Some(MappingState::Pending) + )); + + // don't renew while renewing is already in progress + let to_renew = std::future::poll_fn(|cx| Poll::Ready(mapping_list.renew(cx))).await; + assert_eq!(to_renew, vec![]); + } + + async fn renew_timeout( + duration_ms: u64, + mapping_list: &mut MappingList, + ) -> Result, tokio::time::error::Elapsed> { + tokio::time::timeout( + Duration::from_millis(duration_ms), + std::future::poll_fn(|cx| { + let to_renew = mapping_list.renew(cx); + match to_renew.is_empty() { + true => Poll::Pending, + false => Poll::Ready(to_renew), + } + }), + ) + .await + } + + #[cfg(feature = "tokio")] + #[tokio::test] + async fn renew_failed() { + const RETRY_INTERVAL_MS: u64 = 30; + let mut mapping_list = MappingList::default(); + mapping_list.insert( + fake_mapping_1(), + MappingState::Failed(Delay::new(Duration::from_millis(RETRY_INTERVAL_MS))), + ); + + assert!( + renew_timeout(RETRY_INTERVAL_MS - FLAKES_AVOIDANCE_MS, &mut mapping_list) + .await + .is_err(), + "Renew should not trigger before 30ms" + ); + + match renew_timeout(RETRY_INTERVAL_MS * 2, &mut mapping_list).await { + Ok(to_renew) => assert_eq!(to_renew, vec![fake_mapping_1()]), + Err(_) => assert!(false, "Renew should trigger after 30ms"), + }; + assert!(matches!( + mapping_list.get(&fake_mapping_1()), + Some(MappingState::Pending) + )); + + // don't renew while renewing is already in progress + let to_renew = std::future::poll_fn(|cx| Poll::Ready(mapping_list.renew(cx))).await; + assert_eq!(to_renew, vec![]); + } + + #[cfg(feature = "tokio")] + #[tokio::test] + async fn renew_active() { + const RETRY_INTERVAL_MS: u64 = 30; + let mut mapping_list = MappingList::default(); + mapping_list.insert( + fake_mapping_1(), + MappingState::Active(Some(Delay::new(Duration::from_millis(RETRY_INTERVAL_MS)))), + ); + + assert!( + renew_timeout(RETRY_INTERVAL_MS - FLAKES_AVOIDANCE_MS, &mut mapping_list) + .await + .is_err(), + "Renew should not trigger before 30ms" + ); + + match renew_timeout(RETRY_INTERVAL_MS * 2, &mut mapping_list).await { + Ok(to_renew) => assert_eq!(to_renew, vec![fake_mapping_1()]), + Err(_) => assert!(false, "Renew should trigger after 30ms"), + }; + assert!(matches!( + mapping_list.get(&fake_mapping_1()), + Some(MappingState::Active(None)) + )); + + // don't renew while renewing is already in progress + let to_renew = std::future::poll_fn(|cx| Poll::Ready(mapping_list.renew(cx))).await; + assert_eq!(to_renew, vec![]); + } + + #[test] + fn handle_gateway_event_mapped() { + let external_addr = "8.8.8.8".parse().unwrap(); + let mut mapping_list = MappingList::default(); + mapping_list.insert(fake_mapping_1(), MappingState::Pending); + let to_swarm_opt = mapping_list.handle_gateway_event( + GatewayEvent::Mapped(fake_mapping_1()), + &Config::new(), + external_addr, + ); + + match to_swarm_opt { + Some(ToSwarm::ExternalAddrConfirmed(addr)) => { + assert_eq!(addr, "/ip4/8.8.8.8/tcp/1234".parse().unwrap()) + } + _ => assert!(false), + }; + + assert!(matches!( + mapping_list.get(&fake_mapping_1()), + Some(MappingState::Active(Some(_))) + )); + + let to_swarm_opt = mapping_list.handle_gateway_event( + GatewayEvent::Mapped(fake_mapping_1()), + &Config::new(), + external_addr, + ); + + assert!(to_swarm_opt.is_none()); + } + + #[test] + fn handle_gateway_event_map_failure() { + let external_addr = "8.8.8.8".parse().unwrap(); + let mut mapping_list = MappingList::default(); + mapping_list.insert(fake_mapping_1(), MappingState::Active(None)); + let to_swarm_opt = mapping_list.handle_gateway_event( + GatewayEvent::MapFailure( + fake_mapping_1(), + igd_next::AddPortError::ActionNotAuthorized.into(), + ), + &Config::new(), + external_addr, + ); + + match to_swarm_opt { + Some(ToSwarm::ExternalAddrExpired(addr)) => { + assert_eq!(addr, "/ip4/8.8.8.8/tcp/1234".parse().unwrap()) + } + _ => assert!(false), + }; + + assert!(matches!( + mapping_list.get(&fake_mapping_1()), + Some(MappingState::Failed(_)) + )); + + let to_swarm_opt = mapping_list.handle_gateway_event( + GatewayEvent::MapFailure( + fake_mapping_1(), + igd_next::AddPortError::ActionNotAuthorized.into(), + ), + &Config::new(), + external_addr, + ); + + assert!(to_swarm_opt.is_none()); + } +} diff --git a/protocols/upnp/src/tokio.rs b/protocols/upnp/src/tokio.rs index 9c8b2cafef9..8780cacfa4c 100644 --- a/protocols/upnp/src/tokio.rs +++ b/protocols/upnp/src/tokio.rs @@ -18,7 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use std::{error::Error, net::IpAddr}; +use std::{ + error::Error, + net::{IpAddr, SocketAddr}, +}; use crate::behaviour::{GatewayEvent, GatewayRequest}; use futures::{ @@ -26,6 +29,7 @@ use futures::{ SinkExt, StreamExt, }; use igd_next::SearchOptions; +use tracing::{debug, trace, trace_span, warn, Instrument}; pub use crate::behaviour::Behaviour; @@ -87,10 +91,13 @@ pub(crate) fn is_addr_global(addr: IpAddr) -> bool { pub(crate) struct Gateway { pub(crate) sender: mpsc::Sender, pub(crate) receiver: mpsc::Receiver, + pub(crate) internal_addr: SocketAddr, pub(crate) external_addr: IpAddr, } -pub(crate) fn search_gateway() -> oneshot::Receiver>> { +pub(crate) fn search_gateway( + description: String, +) -> oneshot::Receiver>> { let (search_result_sender, search_result_receiver) = oneshot::channel(); let (events_sender, mut task_receiver) = mpsc::channel(10); @@ -113,56 +120,67 @@ pub(crate) fn search_gateway() -> oneshot::Receiver { - let gateway = gateway.clone(); - match gateway - .add_port( - mapping.protocol, - mapping.internal_addr.port(), - mapping.internal_addr, - duration, - "rust-libp2p mapping", - ) - .await - { - Ok(()) => GatewayEvent::Mapped(mapping), - Err(err) => GatewayEvent::MapFailure(mapping, err.into()), + // Check if receiver dropped. + if search_result_sender.send(Ok(found_gateway)).is_err() { + return; + } + + loop { + // The task sender has dropped so we can return. + let Some(req) = task_receiver.next().await else { + debug!("Command sender has gone => Stopping gateway loop"); + return; + }; + let event = match req { + GatewayRequest::AddMapping { mapping, duration } => { + trace!(?mapping, "Asking gateway to add mapping"); + match gateway + .add_port( + mapping.protocol, + mapping.internal_addr.port(), + mapping.internal_addr, + duration.as_secs() as u32, + &description, + ) + .await + { + Ok(()) => GatewayEvent::Mapped(mapping), + Err(err) => GatewayEvent::MapFailure(mapping, err.into()), + } } - } - GatewayRequest::RemoveMapping(mapping) => { - let gateway = gateway.clone(); - match gateway - .remove_port(mapping.protocol, mapping.internal_addr.port()) - .await - { - Ok(()) => GatewayEvent::Removed(mapping), - Err(err) => GatewayEvent::RemovalFailure(mapping, err.into()), + GatewayRequest::RemoveMapping(mapping) => { + trace!(?mapping, "Asking gateway to remove mapping"); + match gateway + .remove_port(mapping.protocol, mapping.internal_addr.port()) + .await + { + Ok(()) => GatewayEvent::Removed(mapping), + Err(err) => GatewayEvent::RemovalFailure(mapping, err.into()), + } } - } - }; - task_sender - .send(event) - .await - .expect("receiver should be available"); + }; + if let Err(event) = task_sender + .send(event) + .await { + warn!(?event, "Failed to send gateway event"); + return; + } + } } + .instrument(trace_span!("gateway", gw_addr = %gateway_internal_addr, external_gw_addr = %gateway_external_addr)) + .await }); search_result_receiver