Skip to content

Commit

Permalink
Refactor 'addressmanager', reorder fetching sources for-external ip (#…
Browse files Browse the repository at this point in the history
…300)

* Refactor 'addressmanager' to improve readability and maintainability

The 'addressmanager' component of the codebase has been thoroughly refined for improved readability and maintainability. Firstly, the logic for assigning local network addresses has been streamlined and broken down into smaller methods for more clarity. Old methods 'add_routable_addresses_from_net_interfaces' and 'configured_address' have been replaced by 'routable_addresses_from_net_interfaces', 'local_addresses', and 'upnp'. This also led to the removal of redundant code.

* clippy fixes

* Refactor upnp error handling and add UpnpError enum

Introduced thiserror dependency and created UpnpError enum in the addressmanager module. The enum encapsulates possible errors from Universal plug and play (upnp) functionality in idg_next crate, providing a more granular approach to error handling. The upnp function was also refactored to return a Result type - improving code readability and easing error propagation.

* Update log messages to include UPnP context

Modified log messages in the addressmanager and port_mapping_extender modules to improve readability by including the '[UPnP]' context. The messages were updated in multiple sections where the upnp() function was used, thus helping in easier identification and debugging of UPnP related issues.

* Add UPnP context to existing port mapping detection

Enhanced the port mapping detection functionality in the address manager to handle cases where the default port is already in use by another service. This change allows the gateway to map to a random external port if the default one is already mapped. This is crucial for avoiding port mapping conflicts under UPnP protocol, and hence, improving the robustness of the network setup process.

* Refactor existing port mapping check logic in UPnP

Refactored the logic used to check whether the default external port is already mapped under UPnP in the address manager. The changes allow the application to handle scenarios where the port is in use by another service more effectively, ultimately enabling the gateway to map to another external port when required. This refinement is critical for averting potential port mapping conflicts, thereby enhancing the reliability of the network initialization process.

* Add cleanup logic for UPnP port removal

Added an additional step to handle the removal of a mapped port in the Universal Plug and Play (UPnP) protocol when stopping a service. Included a warning message if the port removal fails and an informational message upon successful removal. This enhancement improves service lifecycle management by ensuring that resources are appropriately released when a service is stopped, preventing potential port leaking issues.

* found -> Found

* "Handle specific port mapping errors in address manager

Updated the address manager component to specifically handle `RequestError` and `SpecifiedArrayIndexInvalid` errors from the `GetGenericPortMappingEntry` method. This change results in improved error logging and handling, improving overall reliability of the UPnP functionality."

* Remove unused import in address manager component

* desciption -> Description

* Refactor itertools usage in addressmanager

This commit simplifies the use of the itertools library in our address manager module to improve readability. Instead of using the full `itertools::Either::` syntax every time, we've destructured `Either::{Left, Right}` at the beginning, allowing for leaner and semantically clearer code.

* This commit introduces a documentary comment explaining the loop that
checks for existing port mappings in the UPnP-enabled Internet Gateway Device (IGD).
The clarification helps developers understand the necessity of this loop, especially
in scenarios where the IGD might not explicitly signal port conflicts.
  • Loading branch information
biryukovmaxim authored Oct 25, 2023
1 parent 34df5e6 commit 19ea1a6
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 105 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/addressmanager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ parking_lot.workspace = true
borsh.workspace = true
log.workspace = true
tokio.workspace = true

thiserror.workspace = true
local-ip-address = "0.5.3"
igd-next = { version = "0.14.2", features = ["aio_tokio"] }

Expand Down
267 changes: 166 additions & 101 deletions components/addressmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,25 @@ mod port_mapping_extender;
mod stores;
extern crate self as address_manager;

use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration};
use std::{collections::HashSet, iter, net::SocketAddr, sync::Arc, time::Duration};

use address_manager::port_mapping_extender::Extender;
use igd_next::{self as igd, aio::tokio::Tokio, AddPortError, Gateway};
use itertools::Itertools;
use igd_next::{
self as igd, aio::tokio::Tokio, AddAnyPortError, AddPortError, Gateway, GetExternalIpError, GetGenericPortMappingEntryError,
SearchError,
};
use itertools::{
Either::{Left, Right},
Itertools,
};
use kaspa_consensus_core::config::Config;
use kaspa_core::task::tick::TickService;
use kaspa_core::{debug, info, time::unix_now, warn};
use kaspa_core::{debug, info, task::tick::TickService, time::unix_now, warn};
use kaspa_database::prelude::{StoreResultExtensions, DB};
use kaspa_utils::networking::IpAddress;
use local_ip_address::list_afinet_netifas;
use parking_lot::Mutex;
use stores::banned_address_store::{BannedAddressesStore, BannedAddressesStoreReader, ConnectionBanTimestamp, DbBannedAddressesStore};
use thiserror::Error;

pub use stores::NetAddress;

Expand All @@ -33,6 +39,18 @@ struct ExtendHelper {
external_port: u16,
}

#[derive(Error, Debug)]
pub enum UpnpError {
#[error(transparent)]
AddPortError(#[from] AddPortError),
#[error(transparent)]
AddAnyPortError(#[from] AddAnyPortError),
#[error(transparent)]
SearchError(#[from] SearchError),
#[error(transparent)]
GetExternalIpError(#[from] GetExternalIpError),
}

pub struct AddressManager {
banned_address_store: DbBannedAddressesStore,
address_store: address_store_with_cache::Store,
Expand All @@ -55,124 +73,171 @@ impl AddressManager {
}

fn init_local_addresses(&mut self, tick_service: Arc<TickService>) -> Option<Extender> {
if let Some((net_addr, extend)) = self.configured_address() {
self.local_net_addresses.push(net_addr);

if let Some(ExtendHelper { gateway, local_addr, external_port }) = extend {
let gateway: igd_next::aio::Gateway<Tokio> = igd_next::aio::Gateway {
addr: gateway.addr,
root_url: gateway.root_url,
control_url: gateway.control_url,
control_schema_url: gateway.control_schema_url,
control_schema: gateway.control_schema,
provider: Tokio,
};
Some(Extender::new(
tick_service,
Duration::from_secs(UPNP_EXTEND_PERIOD),
UPNP_DEADLINE_SEC,
gateway,
external_port,
local_addr,
))
} else {
None
}
self.local_net_addresses = self.local_addresses().collect();

let extender = if self.local_net_addresses.is_empty() && !self.config.disable_upnp {
let (net_address, ExtendHelper { gateway, local_addr, external_port }) = match self.upnp() {
Err(err) => {
warn!("[UPnP] Error adding port mapping: {err}");
return None;
}
Ok(None) => return None,
Ok(Some((net_address, extend_helper))) => (net_address, extend_helper),
};
self.local_net_addresses.push(net_address);

let gateway: igd_next::aio::Gateway<Tokio> = igd_next::aio::Gateway {
addr: gateway.addr,
root_url: gateway.root_url,
control_url: gateway.control_url,
control_schema_url: gateway.control_schema_url,
control_schema: gateway.control_schema,
provider: Tokio,
};
Some(Extender::new(
tick_service,
Duration::from_secs(UPNP_EXTEND_PERIOD),
UPNP_DEADLINE_SEC,
gateway,
external_port,
local_addr,
))
} else {
self.add_routable_addresses_from_net_interfaces();
None
}
};

self.local_net_addresses.iter().for_each(|net_addr| {
info!("Publicly routable local address {} added to store", net_addr);
});
extender
}

fn configured_address(&self) -> Option<(NetAddress, Option<ExtendHelper>)> {
fn local_addresses(&self) -> impl Iterator<Item = NetAddress> + '_ {
match self.config.externalip {
// An external IP was passed, we will try to bind that if it's valid
Some(local_net_address) if local_net_address.ip.is_publicly_routable() => {
info!("External address {} added to store", local_net_address);
Some((local_net_address, None))
info!("External address is publicly routable {}", local_net_address);
return Left(iter::once(local_net_address));
}
Some(local_net_address) => {
info!("Non-publicly routable external address {} not added to store", local_net_address);
None
info!("External address is not publicly routable {}", local_net_address);
}
None if !self.config.disable_upnp => {
let gateway = igd::search_gateway(Default::default()).ok()?;
let ip = IpAddress::new(gateway.get_external_ip().ok()?);
if !ip.is_publicly_routable() {
info!("Non-publicly routable external ip from gateway using upnp {} not added to store", ip);
return None;
}
info!("Got external ip from gateway using upnp: {ip}");

let default_port = self.config.default_p2p_port();
None => {}
};

let normalized_p2p_listen_address = self.config.p2p_listen_address.normalize(default_port);
let local_addr = if normalized_p2p_listen_address.ip.is_unspecified() {
SocketAddr::new(local_ip_address::local_ip().unwrap(), normalized_p2p_listen_address.port)
} else {
normalized_p2p_listen_address.into()
};
Right(self.routable_addresses_from_net_interfaces())
}

match gateway.add_port(
igd::PortMappingProtocol::TCP,
default_port,
local_addr,
UPNP_DEADLINE_SEC as u32,
UPNP_REGISTRATION_NAME,
) {
Ok(_) => {
info!("Added port mapping to default external port: {ip}:{default_port}");
Some((
NetAddress { ip, port: default_port },
Some(ExtendHelper { gateway, local_addr, external_port: default_port }),
))
}
Err(AddPortError::PortInUse {}) => {
let port = gateway
.add_any_port(igd::PortMappingProtocol::TCP, local_addr, UPNP_DEADLINE_SEC as u32, UPNP_REGISTRATION_NAME)
.ok()?;
info!("Added port mapping to random external port: {ip}:{port}");
Some((NetAddress { ip, port }, Some(ExtendHelper { gateway, local_addr, external_port: port })))
}
Err(err) => {
warn!("error adding port: {err}");
None
}
}
}
None => None,
fn routable_addresses_from_net_interfaces(&self) -> impl Iterator<Item = NetAddress> + '_ {
// check whatever was passed as listen address (if routable)
// otherwise(listen_address === 0.0.0.0) check all interfaces
let listen_address = self.config.p2p_listen_address.normalize(self.config.default_p2p_port());
if listen_address.ip.is_publicly_routable() {
info!("Publicly routable local address found: {}", listen_address.ip);
Left(Left(iter::once(listen_address)))
} else if listen_address.ip.is_unspecified() {
let network_interfaces = list_afinet_netifas();
let Ok(network_interfaces) = network_interfaces else {
warn!("Error getting network interfaces: {:?}", network_interfaces);
return Left(Right(iter::empty()));
};
// TODO: Add Check IPv4 or IPv6 match from Go code
Right(network_interfaces.into_iter().map(|(_, ip)| IpAddress::from(ip)).filter(|&ip| ip.is_publicly_routable()).map(
|ip| {
info!("Publicly routable local address found: {}", ip);
NetAddress::new(ip, self.config.default_p2p_port())
},
))
} else {
Left(Right(iter::empty()))
}
}
fn add_routable_addresses_from_net_interfaces(&mut self) {
// If listen_address === 0.0.0.0, bind all interfaces
// else, bind whatever was passed as listen address (if routable)
let listen_address = self.config.p2p_listen_address.normalize(self.config.default_p2p_port());

if listen_address.ip.is_unspecified() {
let network_interfaces = list_afinet_netifas();
fn upnp(&self) -> Result<Option<(NetAddress, ExtendHelper)>, UpnpError> {
info!("[UPnP] Attempting to register upnp... (to disable run the node with --disable-upnp)");
let gateway = igd::search_gateway(Default::default())?;
let ip = IpAddress::new(gateway.get_external_ip()?);
if !ip.is_publicly_routable() {
info!("[UPnP] Non-publicly routable external ip from gateway using upnp {} not added to store", ip);
return Ok(None);
}
info!("[UPnP] Got external ip from gateway using upnp: {ip}");

if let Ok(network_interfaces) = network_interfaces {
for (_, ip) in network_interfaces.iter() {
let curr_ip = IpAddress::new(*ip);
let default_port = self.config.default_p2p_port();

// TODO: Add Check IPv4 or IPv6 match from Go code
if curr_ip.is_publicly_routable() {
info!("Publicly routable local address {} added to store", curr_ip);
self.local_net_addresses.push(NetAddress { ip: curr_ip, port: self.config.default_p2p_port() });
} else {
debug!("Non-publicly routable interface address {} not added to store", curr_ip);
let normalized_p2p_listen_address = self.config.p2p_listen_address.normalize(default_port);
let local_addr = if normalized_p2p_listen_address.ip.is_unspecified() {
SocketAddr::new(local_ip_address::local_ip().unwrap(), normalized_p2p_listen_address.port)
} else {
normalized_p2p_listen_address.into()
};

// This loop checks for existing port mappings in the UPnP-enabled gateway.
//
// The goal of this loop is to identify if the desired external port (`default_port`) is
// already mapped to any device inside the local network. This is crucial because, in
// certain scenarios, gateways might not throw the `PortInUse` error but rather might
// silently remap the external port when there's a conflict. By iterating through the
// current mappings, we can make an informed decision about whether to attempt using
// the default port or request a new random one.
//
// The loop goes through all existing port mappings one-by-one:
// - If a mapping is found that uses the desired external port, the loop breaks with `already_in_use` set to true.
// - If the index is not valid (i.e., we've iterated through all the mappings), the loop breaks with `already_in_use` set to false.
// - Any other errors during fetching of port mappings are handled accordingly, but the end result is to exit the loop with the `already_in_use` flag set appropriately.
let mut index = 0;
let already_in_use = loop {
match gateway.get_generic_port_mapping_entry(index) {
Ok(entry) => {
if entry.enabled && entry.external_port == default_port {
info!("[UPnP] Found existing mapping that uses the same external port. Description: {}, external port: {}, internal port: {}, client: {}, lease duration: {}", entry.port_mapping_description, entry.external_port, entry.internal_port, entry.internal_client, entry.lease_duration);
break true;
}
index += 1;
}
} else {
warn!("Error getting network interfaces: {:?}", network_interfaces);
Err(GetGenericPortMappingEntryError::ActionNotAuthorized) => {
index += 1;
continue;
}
Err(GetGenericPortMappingEntryError::RequestError(err)) => {
warn!("[UPnP] request existing port mapping err: {:?}", err);
break false;
}
Err(GetGenericPortMappingEntryError::SpecifiedArrayIndexInvalid) => break false,
}
} else if listen_address.ip.is_publicly_routable() {
info!("Publicly routable P2P listen address {} added to store", listen_address.ip);
self.local_net_addresses.push(listen_address);
} else {
debug!("Non-publicly routable listen address {} not added to store.", listen_address.ip);
};
if already_in_use {
let port =
gateway.add_any_port(igd::PortMappingProtocol::TCP, local_addr, UPNP_DEADLINE_SEC as u32, UPNP_REGISTRATION_NAME)?;
info!("[UPnP] Added port mapping to random external port: {ip}:{port}");
return Ok(Some((NetAddress { ip, port }, ExtendHelper { gateway, local_addr, external_port: port })));
}

match gateway.add_port(
igd::PortMappingProtocol::TCP,
default_port,
local_addr,
UPNP_DEADLINE_SEC as u32,
UPNP_REGISTRATION_NAME,
) {
Ok(_) => {
info!("[UPnP] Added port mapping to default external port: {ip}:{default_port}");
Ok(Some((NetAddress { ip, port: default_port }, ExtendHelper { gateway, local_addr, external_port: default_port })))
}
Err(AddPortError::PortInUse {}) => {
let port = gateway.add_any_port(
igd::PortMappingProtocol::TCP,
local_addr,
UPNP_DEADLINE_SEC as u32,
UPNP_REGISTRATION_NAME,
)?;
info!("[UPnP] Added port mapping to random external port: {ip}:{port}");
Ok(Some((NetAddress { ip, port }, ExtendHelper { gateway, local_addr, external_port: port })))
}
Err(err) => Err(err.into()),
}
}

pub fn best_local_address(&mut self) -> Option<NetAddress> {
if self.local_net_addresses.is_empty() {
None
Expand Down
11 changes: 8 additions & 3 deletions components/addressmanager/src/port_mapping_extender.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use igd_next::{aio::tokio::Tokio, AddPortError};
use kaspa_core::{
debug, error,
debug, error, info,
task::{
service::{AsyncService, AsyncServiceFuture},
tick::{TickReason, TickService},
Expand Down Expand Up @@ -49,9 +49,9 @@ impl Extender {
)
.await
{
warn!("extend external ip mapping err: {e:?}");
warn!("[UPnP] Extend external ip mapping err: {e:?}");
} else {
debug!("extend external ip mapping");
debug!("[UPnP] Extend external ip mapping");
}
}
// Let the system print final logs before exiting
Expand Down Expand Up @@ -81,6 +81,11 @@ impl AsyncService for Extender {

fn stop(self: Arc<Self>) -> AsyncServiceFuture {
Box::pin(async move {
if let Err(err) = self.gateway.remove_port(igd_next::PortMappingProtocol::TCP, self.external_port).await {
warn!("[UPnP] Remove port mapping err: {err:?}");
} else {
info!("[UPnP] Successfully removed port mapping, external port: {}", self.external_port);
}
trace!("{} stopped", SERVICE_NAME);
Ok(())
})
Expand Down

0 comments on commit 19ea1a6

Please sign in to comment.