Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TCP/AutoNAT/Identify #227

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
74 changes: 74 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion safenode/Cargo.toml
Expand Up @@ -36,7 +36,7 @@ futures = "~0.3.13"
hex = "~0.4.3"
itertools = "~0.10.1"
lazy_static = "~1.4.0"
libp2p = { version="0.51", features = ["tokio", "dns", "kad", "macros", "mdns", "quic", "request-response", "identify"] }
libp2p = { version="0.51", features = ["tokio", "dns", "kad", "macros", "mdns", "quic", "request-response", "identify", "autonat", "mplex", "noise", "tcp", "yamux"] }
libp2p-quic = { version = "0.7.0-alpha.3", features = ["tokio"] }
opentelemetry = { version = "0.17", features = ["rt-tokio"], optional = true }
opentelemetry-otlp = { version = "0.10", optional = true }
Expand Down
16 changes: 15 additions & 1 deletion safenode/src/bin/safenode/main.rs
Expand Up @@ -81,6 +81,15 @@ struct Opt {
/// Enable the admin/ctrl RPC service by providing an IP and port for it to listen on.
#[clap(long)]
rpc: Option<SocketAddr>,

/// Assume we are running on a local network and enable mDNS peer discovery.
/// Defaults to false, which means we will only connect to peers specified by `--peer`.
#[clap(long)]
local: bool,

/// Specify external address, to advertise on the network.
#[clap(long)]
external_address: Option<SocketAddr>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -132,6 +141,8 @@ fn main() -> Result<()> {
node_socket_addr,
peers.clone(),
opt.rpc,
opt.local,
opt.external_address,
&log_dir,
&root_dir,
))?;
Expand All @@ -145,13 +156,16 @@ async fn start_node(
node_socket_addr: SocketAddr,
peers: Vec<(PeerId, Multiaddr)>,
rpc: Option<SocketAddr>,
local: bool,
external_address: Option<SocketAddr>,
log_dir: &str,
root_dir: &Path,
) -> Result<()> {
let started_instant = std::time::Instant::now();

info!("Starting node ...");
let running_node = Node::run(node_socket_addr, peers, root_dir).await?;
let running_node =
Node::run(node_socket_addr, peers, root_dir, local, external_address).await?;

// Channel to receive node ctrl cmds from RPC service (if enabled), and events monitoring task
let (ctrl_tx, mut ctrl_rx) = mpsc::channel::<NodeCtrl>(5);
Expand Down
3 changes: 2 additions & 1 deletion safenode/src/client/api.rs
Expand Up @@ -34,7 +34,8 @@ impl Client {
/// Instantiate a new client.
pub async fn new(signer: SecretKey, peers: Option<Vec<(PeerId, Multiaddr)>>) -> Result<Self> {
info!("Starting Kad swarm in client mode...");
let (network, mut network_event_receiver, swarm_driver) = SwarmDriver::new_client()?;
let (network, mut network_event_receiver, swarm_driver) =
SwarmDriver::new_client(peers.is_none())?;
info!("Client constructed network and swarm_driver");
let events_channel = ClientEventsChannel::default();
let client = Self {
Expand Down
53 changes: 44 additions & 9 deletions safenode/src/network/event.rs
Expand Up @@ -13,18 +13,20 @@ use super::{
};
use crate::{
domain::storage::DiskBackedRecordStore,
network::IDENTIFY_AGENT_STR,
network::{IDENTIFY_AGENT_STR, multiaddr_strip_p2p, multiaddr_is_global},
protocol::{
messages::{QueryResponse, Request, Response},
storage::Chunk,
},
};
use itertools::Itertools;
use libp2p::{
autonat,
kad::{GetRecordOk, Kademlia, KademliaEvent, QueryResult, K_VALUE},
mdns,
multiaddr::Protocol,
request_response::{self, ResponseChannel as PeerResponseChannel},
swarm::{NetworkBehaviour, SwarmEvent},
swarm::{behaviour::toggle::Toggle, NetworkBehaviour, SwarmEvent},
Multiaddr, PeerId,
};
use std::collections::{hash_map, HashSet};
Expand All @@ -36,8 +38,9 @@ use tracing::{info, warn};
pub(super) struct NodeBehaviour {
pub(super) request_response: request_response::Behaviour<MsgCodec>,
pub(super) kademlia: Kademlia<DiskBackedRecordStore>,
pub(super) mdns: mdns::tokio::Behaviour,
pub(super) mdns: Toggle<mdns::tokio::Behaviour>,
pub(super) identify: libp2p::identify::Behaviour,
pub(super) autonat: Toggle<autonat::Behaviour>,
}

#[derive(Debug)]
Expand All @@ -46,6 +49,7 @@ pub(super) enum NodeEvent {
Kademlia(KademliaEvent),
Mdns(Box<mdns::Event>),
Identify(Box<libp2p::identify::Event>),
Autonat(autonat::Event),
}

impl From<request_response::Event<Request, Response>> for NodeEvent {
Expand All @@ -72,6 +76,12 @@ impl From<libp2p::identify::Event> for NodeEvent {
}
}

impl From<autonat::Event> for NodeEvent {
fn from(event: autonat::Event) -> Self {
NodeEvent::Autonat(event)
}
}

#[derive(Debug)]
/// Channel to send the `Response` through.
pub enum MsgResponder {
Expand Down Expand Up @@ -193,12 +203,30 @@ impl SwarmDriver {
}
},
SwarmEvent::Behaviour(NodeEvent::Identify(iden)) => {
info!("IdentifyEvent: {iden:?}");
match *iden {
libp2p::identify::Event::Received { peer_id, info } => {
info!("Adding peer to routing table, based on received identify info from {peer_id:?}: {info:?}");
info!(%peer_id, ?info, "identify: received info");
if info.agent_version.starts_with(IDENTIFY_AGENT_STR) {
for multiaddr in info.listen_addrs {
let addrs = match self.local {
true => info.listen_addrs,
// If we're not in local mode, only add globally reachable addresses
false => info
.listen_addrs
.into_iter()
.filter(multiaddr_is_global)
.collect()
,
};
// Strip the `/p2p/...` part of the multiaddresses
let addrs: Vec<_> = addrs
.into_iter()
.map(|addr| multiaddr_strip_p2p(&addr))
// And deduplicate the list
.unique()
.collect();

info!(%peer_id, ?addrs, "identify: adding addresses");
for multiaddr in addrs {
let _routing_update = self
.swarm
.behaviour_mut()
Expand All @@ -207,9 +235,9 @@ impl SwarmDriver {
}
}
}
libp2p::identify::Event::Sent { .. } => {}
libp2p::identify::Event::Pushed { .. } => {}
libp2p::identify::Event::Error { .. } => {}
libp2p::identify::Event::Sent { .. } => info!("identify: {iden:?}"),
libp2p::identify::Event::Pushed { .. } => info!("identify: {iden:?}"),
libp2p::identify::Event::Error { .. } => info!("identify: {iden:?}"),
}
}
SwarmEvent::Behaviour(NodeEvent::Mdns(mdns_event)) => match *mdns_event {
Expand Down Expand Up @@ -274,6 +302,13 @@ impl SwarmDriver {
}
SwarmEvent::IncomingConnectionError { .. } => {}
SwarmEvent::Dialing(peer_id) => info!("Dialing {peer_id}"),
SwarmEvent::Behaviour(NodeEvent::Autonat(event)) => match event {
autonat::Event::InboundProbe(e) => trace!("AutoNAT inbound probe: {e:?}"),
autonat::Event::OutboundProbe(e) => trace!("AutoNAT outbound probe: {e:?}"),
autonat::Event::StatusChanged { old, new } => {
info!("AutoNAT status changed: {old:?} -> {new:?}");
}
},
todo => error!("SwarmEvent has not been implemented: {todo:?}"),
}
Ok(())
Expand Down