93 changes: 76 additions & 17 deletions safenode/src/network/mod.rs
Expand Up @@ -27,13 +27,12 @@ use crate::domain::storage::{DiskBackedRecordStore, DiskBackedRecordStoreConfig}
use crate::protocol::messages::{QueryResponse, Request, Response};
use futures::{future::select_all, StreamExt};
use libp2p::{
core::muxing::StreamMuxerBox,
identity,
kad::{KBucketKey, Kademlia, KademliaConfig, QueryId, Record, RecordKey},
mdns,
multiaddr::Protocol,
request_response::{self, Config as RequestResponseConfig, ProtocolSupport, RequestId},
swarm::{Swarm, SwarmBuilder},
swarm::{behaviour::toggle::Toggle, Swarm, SwarmBuilder},
Multiaddr, PeerId, Transport,
};
use std::{
Expand Down Expand Up @@ -86,6 +85,7 @@ pub struct SwarmDriver {
pending_get_closest_peers: PendingGetClosest,
pending_requests: HashMap<RequestId, oneshot::Sender<Result<Response>>>,
pending_query: HashMap<QueryId, oneshot::Sender<Result<QueryResponse>>>,
local: bool,
}

impl SwarmDriver {
Expand All @@ -105,6 +105,8 @@ impl SwarmDriver {
pub fn new(
addr: SocketAddr,
root_dir: &Path,
local: bool,
external_address: Option<SocketAddr>,
) -> Result<(Network, mpsc::Receiver<NetworkEvent>, SwarmDriver)> {
let mut kad_cfg = KademliaConfig::default();
let _ = kad_cfg
Expand All @@ -126,22 +128,28 @@ impl SwarmDriver {
.set_record_ttl(None);

let (network, events_receiver, mut swarm_driver) =
Self::with(kad_cfg, false, Some(root_dir.join("record_store")))?;
Self::with(kad_cfg, local, false, Some(root_dir.join("record_store")))?;

// Listen on the provided address
let addr = Multiaddr::from(addr.ip())
.with(Protocol::Udp(addr.port()))
.with(Protocol::QuicV1);
let addr = Multiaddr::from(addr.ip()).with(Protocol::Tcp(addr.port()));
let _listener_id = swarm_driver
.swarm
.listen_on(addr)
.expect("Failed to listen on the provided address");

// Add external address if provided, which should get us discovered and reachable quicker.
if let Some(addr) = external_address {
let addr = Multiaddr::from(addr.ip()).with(Protocol::Tcp(addr.port()));
let _ = swarm_driver
.swarm
.add_external_address(addr, libp2p::swarm::AddressScore::Infinite);
}

Ok((network, events_receiver, swarm_driver))
}

/// Same as `new` API but creates the network components in client mode
pub fn new_client() -> Result<(Network, mpsc::Receiver<NetworkEvent>, SwarmDriver)> {
pub fn new_client(local: bool) -> Result<(Network, mpsc::Receiver<NetworkEvent>, SwarmDriver)> {
// Create a Kademlia behaviour for client mode, i.e. set req/resp protocol
// to outbound-only mode and don't listen on any address
let mut kad_cfg = KademliaConfig::default(); // default query timeout is 60 secs
Expand All @@ -156,12 +164,13 @@ impl SwarmDriver {
NonZeroUsize::new(CLOSE_GROUP_SIZE).ok_or_else(|| Error::InvalidCloseGroupSize)?,
);

Self::with(kad_cfg, true, None)
Self::with(kad_cfg, local, true, None)
}

// Private helper to create the network components with the provided config and req/res behaviour
fn with(
kad_cfg: KademliaConfig,
local: bool,
is_client: bool,
disk_store_path: Option<PathBuf>,
) -> Result<(Network, mpsc::Receiver<NetworkEvent>, SwarmDriver)> {
Expand Down Expand Up @@ -221,7 +230,12 @@ impl SwarmDriver {
query_interval: Duration::from_secs(5),
..Default::default()
};
mdns::tokio::Behaviour::new(cfg, peer_id)?
// Only enable mDNS if we are in local mode
let mdns = match local {
true => Some(mdns::tokio::Behaviour::new(cfg, peer_id)?),
false => None,
};
Toggle::from(mdns)
};

// Identify Behaviour
Expand All @@ -237,20 +251,32 @@ impl SwarmDriver {
};

// Transport
let transport = {
// use the QUIC Protocol for transport
let quic_config = libp2p_quic::Config::new(&keypair);
let transport = libp2p_quic::tokio::Transport::new(quic_config);
transport
.map(|(peer_id, muxer), _| (peer_id, StreamMuxerBox::new(muxer)))
.boxed()
let transport =
libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().port_reuse(true))
.upgrade(libp2p::core::upgrade::Version::V1)
.authenticate(
libp2p::noise::Config::new(&keypair)
.expect("Signing libp2p-noise static DH keypair failed."),
)
.multiplex(libp2p::yamux::Config::default())
.boxed();

// Disable AutoNAT if we are in local mode.
let autonat = match local {
false => Some(libp2p::autonat::Behaviour::new(
peer_id,
libp2p::autonat::Config::default(),
)),
true => None,
};
let autonat = Toggle::from(autonat);

let behaviour = NodeBehaviour {
request_response,
kademlia,
mdns,
identify,
autonat,
};
let swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, peer_id).build();

Expand All @@ -264,6 +290,7 @@ impl SwarmDriver {
pending_get_closest_peers: Default::default(),
pending_requests: Default::default(),
pending_query: Default::default(),
local,
};

Ok((
Expand Down Expand Up @@ -530,6 +557,34 @@ impl Network {
}
}


// Verifies if `Multiaddr` contains IPv4 address that is not global.
// This is used to filter out unroutable addresses from the Kademlia routing table.
pub(crate) fn multiaddr_is_global(multiaddr: &Multiaddr) -> bool {
!multiaddr.iter().any(|addr| match addr {
Protocol::Ip4(ip) => {
// Based on the nightly `is_global` method (`Ipv4Addrs::is_global`), only using what is available in stable.
// Missing `is_shared`, `is_benchmarking` and `is_reserved`.
ip.is_unspecified()
| ip.is_private()
| ip.is_loopback()
| ip.is_link_local()
| ip.is_documentation()
| ip.is_broadcast()
}
_ => false,
})
}

// Strip out the p2p protocol from a multiaddr.
pub(crate) fn multiaddr_strip_p2p(multiaddr: &Multiaddr) -> Multiaddr {
multiaddr
.iter()
.filter(|p| !matches!(p, Protocol::P2p(_)))
.collect()
}


#[cfg(test)]
mod tests {
use super::SwarmDriver;
Expand Down Expand Up @@ -573,7 +628,9 @@ mod tests {
"0.0.0.0:0"
.parse::<SocketAddr>()
.expect("0.0.0.0:0 should parse into a valid `SocketAddr`"),
Path::new(""),
Path::new(""),
true,
None,
)?;
let _handle = tokio::spawn(driver.run());

Expand Down Expand Up @@ -639,6 +696,8 @@ mod tests {
.parse::<SocketAddr>()
.expect("0.0.0.0:0 should parse into a valid `SocketAddr`"),
Path::new(""),
true,
None,
)?;
let _driver_handle = tokio::spawn(driver.run());

Expand Down
7 changes: 6 additions & 1 deletion safenode/src/node/api.rs
Expand Up @@ -14,6 +14,7 @@ use super::{

use crate::{
domain::dbc_genesis::is_genesis_parent_tx,
network::multiaddr_strip_p2p,
network::{close_group_majority, MsgResponder, NetworkEvent, SwarmDriver, SwarmLocalState},
node::{RegisterStorage, Transfers},
protocol::{
Expand Down Expand Up @@ -86,8 +87,10 @@ impl Node {
addr: SocketAddr,
initial_peers: Vec<(PeerId, Multiaddr)>,
root_dir: &Path,
local: bool,
external_address: Option<SocketAddr>,
) -> Result<RunningNode> {
let (network, mut network_event_receiver, swarm_driver) = SwarmDriver::new(addr, root_dir)?;
let (network, mut network_event_receiver, swarm_driver) = SwarmDriver::new(addr, root_dir, local, external_address)?;
let node_events_channel = NodeEventsChannel::default();

let (transfer_action_sender, mut transfer_action_receiver) = mpsc::channel(100);
Expand Down Expand Up @@ -153,6 +156,8 @@ impl Node {
let peers = self.initial_peers.clone();
let _handle = spawn(async move {
for (peer_id, addr) in &peers {
// The addresses passed might contain the peer_id, which we already pass seperately.
let addr = multiaddr_strip_p2p(addr);
if let Err(err) = network.dial(*peer_id, addr.clone()).await {
tracing::error!("Failed to dial {peer_id}: {err:?}");
};
Expand Down