Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor 'addressmanager', reorder fetching sources for-external ip #300

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/addressmanager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ parking_lot.workspace = true
borsh.workspace = true
log.workspace = true
tokio.workspace = true

thiserror.workspace = true
local-ip-address = "0.5.3"
igd-next = { version = "0.14.2", features = ["aio_tokio"] }

Expand Down
267 changes: 166 additions & 101 deletions components/addressmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,25 @@ mod port_mapping_extender;
mod stores;
extern crate self as address_manager;

use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration};
use std::{collections::HashSet, iter, net::SocketAddr, sync::Arc, time::Duration};

use address_manager::port_mapping_extender::Extender;
use igd_next::{self as igd, aio::tokio::Tokio, AddPortError, Gateway};
use itertools::Itertools;
use igd_next::{
self as igd, aio::tokio::Tokio, AddAnyPortError, AddPortError, Gateway, GetExternalIpError, GetGenericPortMappingEntryError,
SearchError,
};
use itertools::{
Either::{Left, Right},
Itertools,
};
use kaspa_consensus_core::config::Config;
use kaspa_core::task::tick::TickService;
use kaspa_core::{debug, info, time::unix_now, warn};
use kaspa_core::{debug, info, task::tick::TickService, time::unix_now, warn};
use kaspa_database::prelude::{StoreResultExtensions, DB};
use kaspa_utils::networking::IpAddress;
use local_ip_address::list_afinet_netifas;
use parking_lot::Mutex;
use stores::banned_address_store::{BannedAddressesStore, BannedAddressesStoreReader, ConnectionBanTimestamp, DbBannedAddressesStore};
use thiserror::Error;

pub use stores::NetAddress;

Expand All @@ -33,6 +39,18 @@ struct ExtendHelper {
external_port: u16,
}

#[derive(Error, Debug)]
pub enum UpnpError {
#[error(transparent)]
AddPortError(#[from] AddPortError),
#[error(transparent)]
AddAnyPortError(#[from] AddAnyPortError),
#[error(transparent)]
SearchError(#[from] SearchError),
#[error(transparent)]
GetExternalIpError(#[from] GetExternalIpError),
}

pub struct AddressManager {
banned_address_store: DbBannedAddressesStore,
address_store: address_store_with_cache::Store,
Expand All @@ -55,124 +73,171 @@ impl AddressManager {
}

fn init_local_addresses(&mut self, tick_service: Arc<TickService>) -> Option<Extender> {
if let Some((net_addr, extend)) = self.configured_address() {
self.local_net_addresses.push(net_addr);

if let Some(ExtendHelper { gateway, local_addr, external_port }) = extend {
let gateway: igd_next::aio::Gateway<Tokio> = igd_next::aio::Gateway {
addr: gateway.addr,
root_url: gateway.root_url,
control_url: gateway.control_url,
control_schema_url: gateway.control_schema_url,
control_schema: gateway.control_schema,
provider: Tokio,
};
Some(Extender::new(
tick_service,
Duration::from_secs(UPNP_EXTEND_PERIOD),
UPNP_DEADLINE_SEC,
gateway,
external_port,
local_addr,
))
} else {
None
}
self.local_net_addresses = self.local_addresses().collect();

let extender = if self.local_net_addresses.is_empty() && !self.config.disable_upnp {
let (net_address, ExtendHelper { gateway, local_addr, external_port }) = match self.upnp() {
Err(err) => {
warn!("[UPnP] Error adding port mapping: {err}");
return None;
}
Ok(None) => return None,
Ok(Some((net_address, extend_helper))) => (net_address, extend_helper),
};
self.local_net_addresses.push(net_address);

let gateway: igd_next::aio::Gateway<Tokio> = igd_next::aio::Gateway {
addr: gateway.addr,
root_url: gateway.root_url,
control_url: gateway.control_url,
control_schema_url: gateway.control_schema_url,
control_schema: gateway.control_schema,
provider: Tokio,
};
Some(Extender::new(
tick_service,
Duration::from_secs(UPNP_EXTEND_PERIOD),
UPNP_DEADLINE_SEC,
gateway,
external_port,
local_addr,
))
} else {
self.add_routable_addresses_from_net_interfaces();
None
}
};

self.local_net_addresses.iter().for_each(|net_addr| {
info!("Publicly routable local address {} added to store", net_addr);
});
extender
}

fn configured_address(&self) -> Option<(NetAddress, Option<ExtendHelper>)> {
fn local_addresses(&self) -> impl Iterator<Item = NetAddress> + '_ {
match self.config.externalip {
// An external IP was passed, we will try to bind that if it's valid
Some(local_net_address) if local_net_address.ip.is_publicly_routable() => {
info!("External address {} added to store", local_net_address);
Some((local_net_address, None))
info!("External address is publicly routable {}", local_net_address);
return Left(iter::once(local_net_address));
}
Some(local_net_address) => {
info!("Non-publicly routable external address {} not added to store", local_net_address);
None
info!("External address is not publicly routable {}", local_net_address);
}
None if !self.config.disable_upnp => {
let gateway = igd::search_gateway(Default::default()).ok()?;
let ip = IpAddress::new(gateway.get_external_ip().ok()?);
if !ip.is_publicly_routable() {
info!("Non-publicly routable external ip from gateway using upnp {} not added to store", ip);
return None;
}
info!("Got external ip from gateway using upnp: {ip}");

let default_port = self.config.default_p2p_port();
None => {}
};

let normalized_p2p_listen_address = self.config.p2p_listen_address.normalize(default_port);
let local_addr = if normalized_p2p_listen_address.ip.is_unspecified() {
SocketAddr::new(local_ip_address::local_ip().unwrap(), normalized_p2p_listen_address.port)
} else {
normalized_p2p_listen_address.into()
};
Right(self.routable_addresses_from_net_interfaces())
}

match gateway.add_port(
igd::PortMappingProtocol::TCP,
default_port,
local_addr,
UPNP_DEADLINE_SEC as u32,
UPNP_REGISTRATION_NAME,
) {
Ok(_) => {
info!("Added port mapping to default external port: {ip}:{default_port}");
Some((
NetAddress { ip, port: default_port },
Some(ExtendHelper { gateway, local_addr, external_port: default_port }),
))
}
Err(AddPortError::PortInUse {}) => {
let port = gateway
.add_any_port(igd::PortMappingProtocol::TCP, local_addr, UPNP_DEADLINE_SEC as u32, UPNP_REGISTRATION_NAME)
.ok()?;
info!("Added port mapping to random external port: {ip}:{port}");
Some((NetAddress { ip, port }, Some(ExtendHelper { gateway, local_addr, external_port: port })))
}
Err(err) => {
warn!("error adding port: {err}");
None
}
}
}
None => None,
fn routable_addresses_from_net_interfaces(&self) -> impl Iterator<Item = NetAddress> + '_ {
// check whatever was passed as listen address (if routable)
// otherwise(listen_address === 0.0.0.0) check all interfaces
let listen_address = self.config.p2p_listen_address.normalize(self.config.default_p2p_port());
if listen_address.ip.is_publicly_routable() {
info!("Publicly routable local address found: {}", listen_address.ip);
Left(Left(iter::once(listen_address)))
} else if listen_address.ip.is_unspecified() {
let network_interfaces = list_afinet_netifas();
let Ok(network_interfaces) = network_interfaces else {
warn!("Error getting network interfaces: {:?}", network_interfaces);
return Left(Right(iter::empty()));
};
// TODO: Add Check IPv4 or IPv6 match from Go code
Right(network_interfaces.into_iter().map(|(_, ip)| IpAddress::from(ip)).filter(|&ip| ip.is_publicly_routable()).map(
|ip| {
info!("Publicly routable local address found: {}", ip);
NetAddress::new(ip, self.config.default_p2p_port())
},
))
} else {
Left(Right(iter::empty()))
}
}
fn add_routable_addresses_from_net_interfaces(&mut self) {
// If listen_address === 0.0.0.0, bind all interfaces
// else, bind whatever was passed as listen address (if routable)
let listen_address = self.config.p2p_listen_address.normalize(self.config.default_p2p_port());

if listen_address.ip.is_unspecified() {
let network_interfaces = list_afinet_netifas();
fn upnp(&self) -> Result<Option<(NetAddress, ExtendHelper)>, UpnpError> {
info!("[UPnP] Attempting to register upnp... (to disable run the node with --disable-upnp)");
let gateway = igd::search_gateway(Default::default())?;
let ip = IpAddress::new(gateway.get_external_ip()?);
if !ip.is_publicly_routable() {
info!("[UPnP] Non-publicly routable external ip from gateway using upnp {} not added to store", ip);
return Ok(None);
}
info!("[UPnP] Got external ip from gateway using upnp: {ip}");

if let Ok(network_interfaces) = network_interfaces {
for (_, ip) in network_interfaces.iter() {
let curr_ip = IpAddress::new(*ip);
let default_port = self.config.default_p2p_port();

// TODO: Add Check IPv4 or IPv6 match from Go code
if curr_ip.is_publicly_routable() {
info!("Publicly routable local address {} added to store", curr_ip);
self.local_net_addresses.push(NetAddress { ip: curr_ip, port: self.config.default_p2p_port() });
} else {
debug!("Non-publicly routable interface address {} not added to store", curr_ip);
let normalized_p2p_listen_address = self.config.p2p_listen_address.normalize(default_port);
let local_addr = if normalized_p2p_listen_address.ip.is_unspecified() {
SocketAddr::new(local_ip_address::local_ip().unwrap(), normalized_p2p_listen_address.port)
} else {
normalized_p2p_listen_address.into()
};

// This loop checks for existing port mappings in the UPnP-enabled gateway.
//
// The goal of this loop is to identify if the desired external port (`default_port`) is
// already mapped to any device inside the local network. This is crucial because, in
// certain scenarios, gateways might not throw the `PortInUse` error but rather might
// silently remap the external port when there's a conflict. By iterating through the
// current mappings, we can make an informed decision about whether to attempt using
// the default port or request a new random one.
//
// The loop goes through all existing port mappings one-by-one:
// - If a mapping is found that uses the desired external port, the loop breaks with `already_in_use` set to true.
// - If the index is not valid (i.e., we've iterated through all the mappings), the loop breaks with `already_in_use` set to false.
// - Any other errors during fetching of port mappings are handled accordingly, but the end result is to exit the loop with the `already_in_use` flag set appropriately.
let mut index = 0;
let already_in_use = loop {
biryukovmaxim marked this conversation as resolved.
Show resolved Hide resolved
match gateway.get_generic_port_mapping_entry(index) {
Ok(entry) => {
if entry.enabled && entry.external_port == default_port {
info!("[UPnP] Found existing mapping that uses the same external port. Description: {}, external port: {}, internal port: {}, client: {}, lease duration: {}", entry.port_mapping_description, entry.external_port, entry.internal_port, entry.internal_client, entry.lease_duration);
break true;
}
index += 1;
}
} else {
warn!("Error getting network interfaces: {:?}", network_interfaces);
Err(GetGenericPortMappingEntryError::ActionNotAuthorized) => {
index += 1;
continue;
}
Err(GetGenericPortMappingEntryError::RequestError(err)) => {
warn!("[UPnP] request existing port mapping err: {:?}", err);
break false;
}
Err(GetGenericPortMappingEntryError::SpecifiedArrayIndexInvalid) => break false,
}
} else if listen_address.ip.is_publicly_routable() {
info!("Publicly routable P2P listen address {} added to store", listen_address.ip);
self.local_net_addresses.push(listen_address);
} else {
debug!("Non-publicly routable listen address {} not added to store.", listen_address.ip);
};
if already_in_use {
let port =
gateway.add_any_port(igd::PortMappingProtocol::TCP, local_addr, UPNP_DEADLINE_SEC as u32, UPNP_REGISTRATION_NAME)?;
info!("[UPnP] Added port mapping to random external port: {ip}:{port}");
return Ok(Some((NetAddress { ip, port }, ExtendHelper { gateway, local_addr, external_port: port })));
}

match gateway.add_port(
igd::PortMappingProtocol::TCP,
default_port,
local_addr,
UPNP_DEADLINE_SEC as u32,
UPNP_REGISTRATION_NAME,
) {
Ok(_) => {
info!("[UPnP] Added port mapping to default external port: {ip}:{default_port}");
Ok(Some((NetAddress { ip, port: default_port }, ExtendHelper { gateway, local_addr, external_port: default_port })))
}
Err(AddPortError::PortInUse {}) => {
let port = gateway.add_any_port(
igd::PortMappingProtocol::TCP,
local_addr,
UPNP_DEADLINE_SEC as u32,
UPNP_REGISTRATION_NAME,
)?;
info!("[UPnP] Added port mapping to random external port: {ip}:{port}");
Ok(Some((NetAddress { ip, port }, ExtendHelper { gateway, local_addr, external_port: port })))
}
Err(err) => Err(err.into()),
}
}

pub fn best_local_address(&mut self) -> Option<NetAddress> {
if self.local_net_addresses.is_empty() {
None
Expand Down
11 changes: 8 additions & 3 deletions components/addressmanager/src/port_mapping_extender.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use igd_next::{aio::tokio::Tokio, AddPortError};
use kaspa_core::{
debug, error,
debug, error, info,
task::{
service::{AsyncService, AsyncServiceFuture},
tick::{TickReason, TickService},
Expand Down Expand Up @@ -49,9 +49,9 @@ impl Extender {
)
.await
{
warn!("extend external ip mapping err: {e:?}");
warn!("[UPnP] Extend external ip mapping err: {e:?}");
} else {
debug!("extend external ip mapping");
debug!("[UPnP] Extend external ip mapping");
}
}
// Let the system print final logs before exiting
Expand Down Expand Up @@ -81,6 +81,11 @@ impl AsyncService for Extender {

fn stop(self: Arc<Self>) -> AsyncServiceFuture {
Box::pin(async move {
if let Err(err) = self.gateway.remove_port(igd_next::PortMappingProtocol::TCP, self.external_port).await {
warn!("[UPnP] Remove port mapping err: {err:?}");
} else {
info!("[UPnP] Successfully removed port mapping, external port: {}", self.external_port);
}
trace!("{} stopped", SERVICE_NAME);
Ok(())
})
Expand Down