Skip to content

Commit

Permalink
refactor(*): rework NodeAddr (#1506)
Browse files Browse the repository at this point in the history
## Description

- Adds a `AddrInfo` that contains the addressing information of peers.
This is necessary as it's own type since it's serialized both in gossip
and in #1488
- Renames `NodeAddr` to `PeerData` (better name suggestions are well
received) this helps consolidate the "peer" terminology already present
throughout gossip, sync, downloader, and many other places
- Use this type in different parts of the code. Some other types that
can be replaced by this are left to #1493

## Notes & open questions

This applies the suggestion of doing `connect` using the full type. It's
debatable if this is better than `connect(PublicKey, AddrInfo)` but both
are imo an improvement over the current way since the idea if "what does
iroh need to connect to a peer" is now fully described in a type

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [ ] Tests if relevant.
  • Loading branch information
divagant-martian committed Sep 21, 2023
1 parent adbfe65 commit f16e439
Show file tree
Hide file tree
Showing 20 changed files with 217 additions and 206 deletions.
40 changes: 8 additions & 32 deletions iroh-gossip/examples/chat.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, fmt, net::SocketAddr, str::FromStr, sync::Arc};
use std::{collections::HashMap, fmt, str::FromStr, sync::Arc};

use anyhow::{bail, Context};
use bytes::Bytes;
Expand All @@ -13,7 +13,7 @@ use iroh_net::{
derp::DerpMap,
key::{PublicKey, SecretKey},
magic_endpoint::accept_conn,
MagicEndpoint,
MagicEndpoint, PeerAddr,
};
use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -117,7 +117,7 @@ async fn main() -> anyhow::Result<()> {
let gossip_cell = gossip_cell.clone();
let notify = notify.clone();
Box::new(move |endpoints| {
// send our updated endpoints to the gossip protocol to be sent as PeerData to peers
// send our updated endpoints to the gossip protocol to be sent as PeerAddr to peers
if let Some(gossip) = gossip_cell.get() {
gossip.update_endpoints(endpoints).ok();
}
Expand All @@ -144,8 +144,8 @@ async fn main() -> anyhow::Result<()> {

// print a ticket that includes our own peer id and endpoint addresses
let ticket = {
let me = PeerAddr::from_endpoint(&endpoint).await?;
let peers = peers.iter().chain([&me]).cloned().collect();
let me = endpoint.my_addr().await?;
let peers = peers.iter().cloned().chain([me]).collect();
Ticket { topic, peers }
};
println!("> ticket to join us: {ticket}");
Expand All @@ -154,18 +154,16 @@ async fn main() -> anyhow::Result<()> {
tokio::spawn(endpoint_loop(endpoint.clone(), gossip.clone()));

// join the gossip topic by connecting to known peers, if any
let peer_ids = peers.iter().map(|p| p.peer_id).collect();
if peers.is_empty() {
println!("> waiting for peers to join us...");
} else {
println!("> trying to connect to {} peers...", peers.len());
// add the peer addrs from the ticket to our endpoint's addressbook so that they can be dialed
for peer in &peers {
endpoint
.add_known_addrs(peer.peer_id, peer.derp_region, &peer.addrs)
.await?;
for peer in peers.into_iter() {
endpoint.add_peer_addr(peer).await?;
}
};
let peer_ids = peers.iter().map(|p| p.peer_id).collect();
gossip.join(topic, peer_ids).await?.await?;
println!("> connected!");

Expand Down Expand Up @@ -305,28 +303,6 @@ impl Ticket {
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
struct PeerAddr {
peer_id: PublicKey,
addrs: Vec<SocketAddr>,
derp_region: Option<u16>,
}

impl PeerAddr {
pub async fn from_endpoint(endpoint: &MagicEndpoint) -> anyhow::Result<Self> {
Ok(Self {
peer_id: endpoint.peer_id(),
derp_region: endpoint.my_derp().await,
addrs: endpoint
.local_endpoints()
.await?
.iter()
.map(|ep| ep.addr)
.collect(),
})
}
}

/// Serializes to base32.
impl fmt::Display for Ticket {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Expand Down
63 changes: 19 additions & 44 deletions iroh-gossip/src/net.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
//! Networking for the `iroh-gossip` protocol

use std::{
collections::HashMap, fmt, future::Future, net::SocketAddr, sync::Arc, task::Poll,
time::Instant,
};

use anyhow::{anyhow, Context};
use bytes::{Bytes, BytesMut};
use futures::{stream::Stream, FutureExt};
use genawaiter::sync::{Co, Gen};
use iroh_net::magic_endpoint::AddrInfo;
use iroh_net::{key::PublicKey, magic_endpoint::get_peer_id, MagicEndpoint};
use rand::rngs::StdRng;
use rand_core::SeedableRng;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fmt, future::Future, sync::Arc, task::Poll, time::Instant};
use tokio::{
sync::{broadcast, mpsc, oneshot, watch},
task::JoinHandle,
Expand Down Expand Up @@ -124,7 +120,7 @@ impl Gossip {
///
///
/// This method only asks for [`PublicKey`]s. You must supply information on how to
/// connect to these peers manually before, by calling [`MagicEndpoint::add_known_addrs`] on
/// connect to these peers manually before, by calling [`MagicEndpoint::add_peer_addr`] on
/// the underlying [`MagicEndpoint`].
///
/// This method returns a future that completes once the request reached the local actor.
Expand Down Expand Up @@ -253,32 +249,6 @@ impl Future for JoinTopicFut {
}
}

/// Addressing information for peers.
///
/// This struct is serialized and transmitted to peers in `Join` and `ForwardJoin` messages.
/// It contains the information needed by `iroh-net` to connect to peers.
///
/// TODO: Replace with type from iroh-net
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct IrohInfo {
addrs: Vec<SocketAddr>,
derp_region: Option<u16>,
}

impl IrohInfo {
async fn from_endpoint(endpoint: &MagicEndpoint) -> anyhow::Result<Self> {
Ok(Self {
addrs: endpoint
.local_endpoints()
.await?
.iter()
.map(|ep| ep.addr)
.collect(),
derp_region: endpoint.my_derp().await,
})
}
}

/// Whether a connection is initiated by us (Dial) or by the remote peer (Accept)
#[derive(Debug)]
enum ConnOrigin {
Expand Down Expand Up @@ -370,7 +340,7 @@ impl Actor {
}
},
_ = self.on_endpoints_rx.changed() => {
let info = IrohInfo::from_endpoint(&self.endpoint).await?;
let info = self.endpoint.my_addr().await?;
let peer_data = postcard::to_stdvec(&info)?;
self.handle_in_event(InEvent::UpdatePeerData(peer_data.into()), Instant::now()).await?;
}
Expand Down Expand Up @@ -533,15 +503,15 @@ impl Actor {
self.pending_sends.remove(&peer);
self.dialer.abort_dial(&peer);
}
OutEvent::PeerData(peer, data) => match postcard::from_bytes::<IrohInfo>(&data) {
OutEvent::PeerData(peer, data) => match postcard::from_bytes::<AddrInfo>(&data) {
Err(err) => warn!("Failed to decode PeerData from {peer}: {err}"),
Ok(info) => {
debug!(me = ?self.endpoint.peer_id(), peer = ?peer, "add known addrs: {info:?}");
if let Err(err) = self
.endpoint
.add_known_addrs(peer, info.derp_region, &info.addrs)
.await
{
let peer_addr = iroh_net::PeerAddr {
peer_id: peer,
info,
};
if let Err(err) = self.endpoint.add_peer_addr(peer_addr).await {
debug!(me = ?self.endpoint.peer_id(), peer = ?peer, "add known failed: {err:?}");
}
}
Expand Down Expand Up @@ -623,6 +593,7 @@ async fn connection_loop(
mod test {
use std::time::Duration;

use iroh_net::PeerAddr;
use iroh_net::{derp::DerpMap, MagicEndpoint};
use tokio::spawn;
use tokio::time::timeout;
Expand Down Expand Up @@ -685,8 +656,12 @@ mod test {

let topic: TopicId = blake3::hash(b"foobar").into();
// share info that pi1 is on the same derp_region
ep2.add_known_addrs(pi1, derp_region, &[]).await.unwrap();
ep3.add_known_addrs(pi1, derp_region, &[]).await.unwrap();
ep2.add_peer_addr(PeerAddr::new(pi1).with_derp_region(derp_region))
.await
.unwrap();
ep3.add_peer_addr(PeerAddr::new(pi1).with_derp_region(derp_region))
.await
.unwrap();
// join the topics and wait for the connection to succeed
go1.join(topic, vec![]).await.unwrap();
go2.join(topic, vec![pi1]).await.unwrap().await.unwrap();
Expand Down Expand Up @@ -799,7 +774,7 @@ mod test {
/// [`MagicEndpoint::connect`]: crate::magic_endpoint::MagicEndpoint
pub(crate) async fn run_derp_and_stun(
stun_ip: IpAddr,
) -> Result<(DerpMap, Option<u16>, CleanupDropGuard)> {
) -> Result<(DerpMap, u16, CleanupDropGuard)> {
// TODO: pass a mesh_key?

let server_key = SecretKey::generate();
Expand Down Expand Up @@ -834,7 +809,7 @@ mod test {
server.shutdown().await;
});

Ok((m, Some(region_id), CleanupDropGuard(tx)))
Ok((m, region_id, CleanupDropGuard(tx)))
}

/// Sets up a simple STUN server.
Expand Down
4 changes: 2 additions & 2 deletions iroh-gossip/src/net/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl Dialer {
/// Start to dial a peer
///
/// Note that the peer's addresses and/or derp region must be added to the endpoint's
/// addressbook for a dial to succeed, see [`MagicEndpoint::add_known_addrs`].
/// addressbook for a dial to succeed, see [`MagicEndpoint::add_peer_addr`].
pub fn queue_dial(&mut self, peer_id: PublicKey, alpn_protocol: &'static [u8]) {
if self.is_pending(&peer_id) {
return;
Expand All @@ -117,7 +117,7 @@ impl Dialer {
let res = tokio::select! {
biased;
_ = cancel.cancelled() => Err(anyhow!("Cancelled")),
res = endpoint.connect(peer_id, alpn_protocol, None, &[]) => res
res = endpoint.connect(iroh_net::PeerAddr::new(peer_id), alpn_protocol) => res
};
(peer_id, res)
}
Expand Down
7 changes: 3 additions & 4 deletions iroh-net/examples/magic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use iroh_net::{
derp::DerpMap,
key::{PublicKey, SecretKey},
magic_endpoint::accept_conn,
MagicEndpoint,
MagicEndpoint, PeerAddr,
};
use tracing::{debug, info};
use url::Url;
Expand Down Expand Up @@ -98,9 +98,8 @@ async fn main() -> anyhow::Result<()> {
} => {
let peer_id: PublicKey = peer_id.parse()?;
let addrs = addrs.unwrap_or_default();
let conn = endpoint
.connect(peer_id, EXAMPLE_ALPN, derp_region, &addrs)
.await?;
let peer_addr = PeerAddr::from_parts(peer_id, derp_region, addrs);
let conn = endpoint.connect(peer_addr, EXAMPLE_ALPN).await?;
info!("connected");

let (mut send, mut recv) = conn.open_bi().await?;
Expand Down
2 changes: 1 addition & 1 deletion iroh-net/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub mod stun;
pub mod tls;
pub mod util;

pub use magic_endpoint::{MagicEndpoint, NodeAddr};
pub use magic_endpoint::{AddrInfo, MagicEndpoint, PeerAddr};

#[cfg(test)]
pub(crate) mod test_utils;
Loading

0 comments on commit f16e439

Please sign in to comment.