Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UPnP support and configurations to Kaspad #288

Merged
merged 10 commits into from
Oct 24, 2023
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
target
web-root
/.idea/*
**/.idea/
/rust-toolchain
/.vscode/
**/db-*
Expand Down
47 changes: 47 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions components/addressmanager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ rand.workspace = true
parking_lot.workspace = true
borsh.workspace = true
log.workspace = true
tokio.workspace = true

local-ip-address = "0.5.3"
igd-next = { version = "0.14.2", features = ["aio_tokio"] }

[dev-dependencies]
statrs = "0.13.0"
Expand Down
163 changes: 123 additions & 40 deletions components/addressmanager/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,37 @@
mod port_mapping_extender;
mod stores;

extern crate self as address_manager;

use std::{collections::HashSet, sync::Arc};
use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration};

use address_manager::port_mapping_extender::Extender;
use igd_next::{self as igd, aio::tokio::Tokio, AddPortError, Gateway};
use itertools::Itertools;
use kaspa_consensus_core::config::Config;
use kaspa_core::task::tick::TickService;
use kaspa_core::{debug, info, time::unix_now, warn};
use kaspa_database::prelude::{StoreResultExtensions, DB};
use kaspa_utils::networking::IpAddress;
use local_ip_address::list_afinet_netifas;
use parking_lot::Mutex;

use stores::banned_address_store::{BannedAddressesStore, BannedAddressesStoreReader, ConnectionBanTimestamp, DbBannedAddressesStore};

pub use stores::NetAddress;

const MAX_ADDRESSES: usize = 4096;
const MAX_CONNECTION_FAILED_COUNT: u64 = 3;

const UPNP_DEADLINE_SEC: u64 = 2 * 60;
const UPNP_EXTEND_PERIOD: u64 = UPNP_DEADLINE_SEC / 2;

const APP_NAME: &str = "rusty-kaspa";

struct ExtendHelper {
gateway: Gateway,
local_addr: SocketAddr,
external_port: u16,
}

pub struct AddressManager {
banned_address_store: DbBannedAddressesStore,
address_store: address_store_with_cache::Store,
Expand All @@ -27,63 +40,132 @@ pub struct AddressManager {
}

impl AddressManager {
pub fn new(config: Arc<Config>, db: Arc<DB>) -> Arc<Mutex<Self>> {
pub fn new(config: Arc<Config>, db: Arc<DB>, tick_service: Arc<TickService>) -> (Arc<Mutex<Self>>, Option<Extender>) {
let mut instance = Self {
banned_address_store: DbBannedAddressesStore::new(db.clone(), MAX_ADDRESSES as u64),
address_store: address_store_with_cache::new(db),
local_net_addresses: Vec::new(),
config,
};

instance.init_local_addresses();
let extender = instance.init_local_addresses(tick_service);

Arc::new(Mutex::new(instance))
(Arc::new(Mutex::new(instance)), extender)
}

fn init_local_addresses(&mut self) {
fn init_local_addresses(&mut self, tick_service: Arc<TickService>) -> Option<Extender> {
if let Some((net_addr, extend)) = self.configured_address() {
self.local_net_addresses.push(net_addr);

if let Some(ExtendHelper { gateway, local_addr, external_port }) = extend {
let gateway: igd_next::aio::Gateway<Tokio> = igd_next::aio::Gateway {
addr: gateway.addr,
root_url: gateway.root_url,
control_url: gateway.control_url,
control_schema_url: gateway.control_schema_url,
control_schema: gateway.control_schema,
provider: Tokio,
};
Some(Extender::new(
tick_service,
Duration::from_secs(UPNP_EXTEND_PERIOD),
UPNP_DEADLINE_SEC,
gateway,
external_port,
local_addr,
))
} else {
None
}
} else {
self.add_routable_addresses_from_net_interfaces();
None
}
}

fn configured_address(&self) -> Option<(NetAddress, Option<ExtendHelper>)> {
match self.config.externalip {
// An external IP was passed, we will try to bind that if it's valid
Some(local_net_address) if local_net_address.ip.is_publicly_routable() => {
info!("External address {} added to store", local_net_address);
Some((local_net_address, None))
}
Some(local_net_address) => {
// An external IP was passed, we will try to bind that if it's valid
if local_net_address.is_publicly_routable() {
info!("External ip {} added to store", local_net_address);
self.local_net_addresses.push(NetAddress { ip: local_net_address, port: self.config.default_p2p_port() });
info!("Non-publicly routable external address {} not added to store", local_net_address);
None
}
None if !self.config.disable_upnp => {
let gateway = igd::search_gateway(Default::default()).ok()?;
let ip = IpAddress::new(gateway.get_external_ip().ok()?);
if !ip.is_publicly_routable() {
info!("Non-publicly routable external ip from gateway using upnp {} not added to store", ip);
return None;
}
info!("Got external ip from gateway using upnp: {ip}");

let default_port = self.config.default_p2p_port();

let normalized_p2p_listen_address = self.config.p2p_listen_address.normalize(default_port);
let local_addr = if normalized_p2p_listen_address.ip.is_unspecified() {
SocketAddr::new(local_ip_address::local_ip().unwrap(), normalized_p2p_listen_address.port)
} else {
debug!("Non-publicly routable external ip {} not added to store", local_net_address);
normalized_p2p_listen_address.into()
};

match gateway.add_port(igd::PortMappingProtocol::TCP, default_port, local_addr, UPNP_DEADLINE_SEC as u32, APP_NAME) {
Ok(_) => {
info!("Added port mapping to default external port: {ip}:{default_port}");
Some((
NetAddress { ip, port: default_port },
Some(ExtendHelper { gateway, local_addr, external_port: default_port }),
))
}
Err(AddPortError::PortInUse {}) => {
let port = gateway
.add_any_port(igd::PortMappingProtocol::TCP, local_addr, UPNP_DEADLINE_SEC as u32, APP_NAME)
.ok()?;
info!("Added port mapping to random external port: {ip}:{port}");
Some((NetAddress { ip, port }, Some(ExtendHelper { gateway, local_addr, external_port: port })))
}
Err(err) => {
warn!("error adding port: {err}");
None
}
}
}
None => {
// If listen_address === 0.0.0.0, bind all interfaces
// else, bind whatever was passed as listen address (if routable)
let listen_address = self.config.p2p_listen_address.normalize(self.config.default_p2p_port());

if listen_address.ip.is_unspecified() {
let network_interfaces = list_afinet_netifas();

if let Ok(network_interfaces) = network_interfaces {
for (_, ip) in network_interfaces.iter() {
let curr_ip = IpAddress::new(*ip);

// TODO: Add Check IPv4 or IPv6 match from Go code
if curr_ip.is_publicly_routable() {
info!("Publicly routable local address {} added to store", curr_ip);
self.local_net_addresses.push(NetAddress { ip: curr_ip, port: self.config.default_p2p_port() });
} else {
debug!("Non-publicly routable interface address {} not added to store", curr_ip);
}
}
None => None,
}
}
fn add_routable_addresses_from_net_interfaces(&mut self) {
// If listen_address === 0.0.0.0, bind all interfaces
// else, bind whatever was passed as listen address (if routable)
let listen_address = self.config.p2p_listen_address.normalize(self.config.default_p2p_port());

if listen_address.ip.is_unspecified() {
let network_interfaces = list_afinet_netifas();

if let Ok(network_interfaces) = network_interfaces {
for (_, ip) in network_interfaces.iter() {
let curr_ip = IpAddress::new(*ip);

// TODO: Add Check IPv4 or IPv6 match from Go code
if curr_ip.is_publicly_routable() {
info!("Publicly routable local address {} added to store", curr_ip);
self.local_net_addresses.push(NetAddress { ip: curr_ip, port: self.config.default_p2p_port() });
} else {
warn!("Error getting network interfaces: {:?}", network_interfaces);
debug!("Non-publicly routable interface address {} not added to store", curr_ip);
}
} else if listen_address.ip.is_publicly_routable() {
info!("Publicly routable P2P listen address {} added to store", listen_address.ip);
self.local_net_addresses.push(listen_address);
} else {
debug!("Non-publicly routable listen address {} not added to store.", listen_address.ip);
}
} else {
warn!("Error getting network interfaces: {:?}", network_interfaces);
}
} else if listen_address.ip.is_publicly_routable() {
info!("Publicly routable P2P listen address {} added to store", listen_address.ip);
self.local_net_addresses.push(listen_address);
} else {
debug!("Non-publicly routable listen address {} not added to store.", listen_address.ip);
}
}

pub fn best_local_address(&mut self) -> Option<NetAddress> {
if self.local_net_addresses.is_empty() {
None
Expand Down Expand Up @@ -358,6 +440,7 @@ mod address_store_with_cache {
use super::*;
use address_manager::AddressManager;
use kaspa_consensus_core::config::{params::SIMNET_PARAMS, Config};
use kaspa_core::task::tick::TickService;
use kaspa_database::create_temp_db;
use kaspa_database::prelude::ConnBuilder;
use kaspa_utils::networking::IpAddress;
Expand Down Expand Up @@ -390,7 +473,7 @@ mod address_store_with_cache {

let db = create_temp_db!(ConnBuilder::default());
let config = Config::new(SIMNET_PARAMS);
let am = AddressManager::new(Arc::new(config), db.1);
let (am, _) = AddressManager::new(Arc::new(config), db.1, Arc::new(TickService::default()));

let mut am_guard = am.lock();

Expand Down