Skip to content

Commit

Permalink
[libp2p-dns] Implement /dnsaddr resolution. (#1931)
Browse files Browse the repository at this point in the history
* Implement `/dnsaddr` support on `libp2p-dns`.

To that end, since resolving `/dnsaddr` addresses needs
"fully qualified" multiaddresses when dialing, i.e. those
that end with the `/p2p/...` protocol, we make sure that
dialing always uses such fully qualified addresses by
appending the `/p2p` protocol as necessary. As a side-effect,
this adds support for dialing peers via "fully qualified"
addresses, as an alternative to using a `PeerId` together
with a `Multiaddr` with or without the `/p2p` protocol.

* Adapt libp2p-relay.

* Update versions, changelogs and small cleanups.
  • Loading branch information
romanb committed Mar 17, 2021
1 parent c1f75ee commit 45f07bf
Show file tree
Hide file tree
Showing 57 changed files with 741 additions and 312 deletions.
22 changes: 11 additions & 11 deletions Cargo.toml
Expand Up @@ -64,35 +64,35 @@ atomic = "0.5.0"
bytes = "1"
futures = "0.3.1"
lazy_static = "1.2"
libp2p-core = { version = "0.27.2", path = "core", default-features = false }
libp2p-core = { version = "0.28.0", path = "core", default-features = false }
libp2p-floodsub = { version = "0.28.0", path = "protocols/floodsub", optional = true }
libp2p-gossipsub = { version = "0.29.0", path = "./protocols/gossipsub", optional = true }
libp2p-identify = { version = "0.28.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.29.0", path = "protocols/kad", optional = true }
libp2p-mplex = { version = "0.27.2", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.29.0", path = "transports/noise", optional = true }
libp2p-mplex = { version = "0.28.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.30.0", path = "transports/noise", optional = true }
libp2p-ping = { version = "0.28.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.27.1", path = "transports/plaintext", optional = true }
libp2p-plaintext = { version = "0.28.0", path = "transports/plaintext", optional = true }
libp2p-pnet = { version = "0.20.0", path = "transports/pnet", optional = true }
libp2p-relay = { version = "0.1.0", path = "protocols/relay", optional = true }
libp2p-request-response = { version = "0.10.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.28.0", path = "swarm" }
libp2p-swarm-derive = { version = "0.22.0", path = "swarm-derive" }
libp2p-uds = { version = "0.27.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.27.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.30.1", path = "muxers/yamux", optional = true }
multiaddr = { package = "parity-multiaddr", version = "0.11.1", path = "misc/multiaddr" }
libp2p-uds = { version = "0.28.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.28.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.31.0", path = "muxers/yamux", optional = true }
multiaddr = { package = "parity-multiaddr", version = "0.11.2", path = "misc/multiaddr" }
parking_lot = "0.11.0"
pin-project = "1.0.0"
smallvec = "1.6.1"
wasm-timer = "0.2.4"

[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.27.1", path = "transports/deflate", optional = true }
libp2p-deflate = { version = "0.28.0", path = "transports/deflate", optional = true }
libp2p-dns = { version = "0.28.0", path = "transports/dns", optional = true, default-features = false }
libp2p-mdns = { version = "0.29.0", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.27.1", path = "transports/tcp", default-features = false, optional = true }
libp2p-websocket = { version = "0.28.0", path = "transports/websocket", optional = true }
libp2p-tcp = { version = "0.28.0", path = "transports/tcp", default-features = false, optional = true }
libp2p-websocket = { version = "0.29.0", path = "transports/websocket", optional = true }

[dev-dependencies]
async-std = { version = "1.6.2", features = ["attributes"] }
Expand Down
10 changes: 9 additions & 1 deletion core/CHANGELOG.md
@@ -1,4 +1,12 @@
# 0.27.2 [unreleased]
# 0.28.0 [unreleased]

- `Network::dial()` understands `/p2p` addresses and `Transport::dial`
gets a "fully qualified" `/p2p` address when dialing a specific peer,
whether through the `Network::peer()` API or via `Network::dial()`
with a `/p2p` address.

- `Network::dial()` and `network::Peer::dial()` return a `DialError`
on error.

- Shorten and unify `Debug` impls of public keys.

Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Expand Up @@ -2,7 +2,7 @@
name = "libp2p-core"
edition = "2018"
description = "Core traits and structs of libp2p"
version = "0.27.2"
version = "0.28.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
2 changes: 1 addition & 1 deletion core/src/connection/pool.rs
Expand Up @@ -554,7 +554,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>

/// Returns an iterator over all connected peers, i.e. those that have
/// at least one established connection in the pool.
pub fn iter_connected<'a>(&'a self) -> impl Iterator<Item = &'a PeerId> + 'a {
pub fn iter_connected(&self) -> impl Iterator<Item = &PeerId> {
self.established.keys()
}

Expand Down
78 changes: 66 additions & 12 deletions core/src/network.rs
Expand Up @@ -209,13 +209,14 @@ where
&self.local_peer_id
}

/// Dials a multiaddress without expecting a particular remote peer ID.
/// Dials a [`Multiaddr`] that may or may not encapsulate a
/// specific expected remote peer ID.
///
/// The given `handler` will be used to create the
/// [`Connection`](crate::connection::Connection) upon success and the
/// connection ID is returned.
pub fn dial(&mut self, address: &Multiaddr, handler: THandler)
-> Result<ConnectionId, ConnectionLimit>
-> Result<ConnectionId, DialError>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Error: Send + 'static,
Expand All @@ -225,15 +226,32 @@ where
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
// If the address ultimately encapsulates an expected peer ID, dial that peer
// such that any mismatch is detected. We do not "pop off" the `P2p` protocol
// from the address, because it may be used by the `Transport`, i.e. `P2p`
// is a protocol component that can influence any transport, like `libp2p-dns`.
if let Some(multiaddr::Protocol::P2p(ma)) = address.iter().last() {
if let Ok(peer) = PeerId::try_from(ma) {
return self.dial_peer(DialingOpts {
peer,
address: address.clone(),
handler,
remaining: Vec::new(),
})
}
}

// The address does not specify an expected peer, so just try to dial it as-is,
// accepting any peer ID that the remote identifies as.
let info = OutgoingInfo { address, peer_id: None };
match self.transport().clone().dial(address.clone()) {
Ok(f) => {
let f = f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err)));
self.pool.add_outgoing(f, handler, info)
self.pool.add_outgoing(f, handler, info).map_err(DialError::ConnectionLimit)
}
Err(err) => {
let f = future::err(PendingConnectionError::Transport(err));
self.pool.add_outgoing(f, handler, info)
self.pool.add_outgoing(f, handler, info).map_err(DialError::ConnectionLimit)
}
}
}
Expand Down Expand Up @@ -430,7 +448,7 @@ where

/// Initiates a connection attempt to a known peer.
fn dial_peer(&mut self, opts: DialingOpts<PeerId, THandler>)
-> Result<ConnectionId, ConnectionLimit>
-> Result<ConnectionId, DialError>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Dial: Send + 'static,
Expand Down Expand Up @@ -460,7 +478,7 @@ fn dial_peer_impl<TMuxer, TInEvent, TOutEvent, THandler, TTrans>(
<THandler::Handler as ConnectionHandler>::Error>,
dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
opts: DialingOpts<PeerId, THandler>
) -> Result<ConnectionId, ConnectionLimit>
) -> Result<ConnectionId, DialError>
where
THandler: IntoConnectionHandler + Send + 'static,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
Expand All @@ -478,23 +496,28 @@ where
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
let result = match transport.dial(opts.address.clone()) {
// Ensure the address to dial encapsulates the `p2p` protocol for the
// targeted peer, so that the transport has a "fully qualified" address
// to work with.
let addr = p2p_addr(opts.peer, opts.address).map_err(DialError::InvalidAddress)?;

let result = match transport.dial(addr.clone()) {
Ok(fut) => {
let fut = fut.map_err(|e| PendingConnectionError::Transport(TransportError::Other(e)));
let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info)
let info = OutgoingInfo { address: &addr, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info).map_err(DialError::ConnectionLimit)
},
Err(err) => {
let fut = future::err(PendingConnectionError::Transport(err));
let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info)
let info = OutgoingInfo { address: &addr, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info).map_err(DialError::ConnectionLimit)
},
};

if let Ok(id) = &result {
dialing.entry(opts.peer).or_default().push(
peer::DialingState {
current: (*id, opts.address),
current: (*id, addr),
remaining: opts.remaining,
},
);
Expand Down Expand Up @@ -668,6 +691,37 @@ impl NetworkConfig {
}
}

/// Ensures a given `Multiaddr` is a `/p2p/...` address for the given peer.
///
/// If the given address is already a `p2p` address for the given peer,
/// i.e. the last encapsulated protocol is `/p2p/<peer-id>`, this is a no-op.
///
/// If the given address is already a `p2p` address for a different peer
/// than the one given, the given `Multiaddr` is returned as an `Err`.
///
/// If the given address is not yet a `p2p` address for the given peer,
/// the `/p2p/<peer-id>` protocol is appended to the returned address.
fn p2p_addr(peer: PeerId, addr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
if let Some(multiaddr::Protocol::P2p(hash)) = addr.iter().last() {
if &hash != peer.as_ref() {
return Err(addr)
}
Ok(addr)
} else {
Ok(addr.with(multiaddr::Protocol::P2p(peer.into())))
}
}

/// Possible (synchronous) errors when dialing a peer.
#[derive(Clone, Debug)]
pub enum DialError {
/// The dialing attempt is rejected because of a connection limit.
ConnectionLimit(ConnectionLimit),
/// The address being dialed is invalid, e.g. if it refers to a different
/// remote peer than the one being dialed.
InvalidAddress(Multiaddr),
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
8 changes: 5 additions & 3 deletions core/src/network/peer.rs
Expand Up @@ -45,7 +45,7 @@ use std::{
error,
fmt,
};
use super::{Network, DialingOpts};
use super::{Network, DialingOpts, DialError};

/// The possible representations of a peer in a [`Network`], as
/// seen by the local node.
Expand Down Expand Up @@ -210,7 +210,7 @@ where
pub fn dial<I>(self, address: Multiaddr, remaining: I, handler: THandler)
-> Result<
(ConnectionId, DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>),
ConnectionLimit
DialError
>
where
I: IntoIterator<Item = Multiaddr>,
Expand All @@ -219,7 +219,9 @@ where
Peer::Connected(p) => (p.peer_id, p.network),
Peer::Dialing(p) => (p.peer_id, p.network),
Peer::Disconnected(p) => (p.peer_id, p.network),
Peer::Local => return Err(ConnectionLimit { current: 0, limit: 0 })
Peer::Local => return Err(DialError::ConnectionLimit(ConnectionLimit {
current: 0, limit: 0
}))
};

let id = network.dial_peer(DialingOpts {
Expand Down
19 changes: 7 additions & 12 deletions core/src/transport/memory.rs
Expand Up @@ -263,19 +263,14 @@ impl Drop for Listener {

/// If the address is `/memory/n`, returns the value of `n`.
fn parse_memory_addr(a: &Multiaddr) -> Result<u64, ()> {
let mut iter = a.iter();

let port = if let Some(Protocol::Memory(port)) = iter.next() {
port
} else {
return Err(());
};

if iter.next().is_some() {
return Err(());
let mut protocols = a.iter();
match protocols.next() {
Some(Protocol::Memory(port)) => match protocols.next() {
None | Some(Protocol::P2p(_)) => Ok(port),
_ => Err(())
}
_ => Err(())
}

Ok(port)
}

/// A channel represents an established, in-memory, logical connection between two endpoints.
Expand Down
16 changes: 10 additions & 6 deletions core/tests/connection_limits.rs
Expand Up @@ -25,7 +25,7 @@ use libp2p_core::multiaddr::{multiaddr, Multiaddr};
use libp2p_core::{
PeerId,
connection::PendingConnectionError,
network::{NetworkEvent, NetworkConfig, ConnectionLimits},
network::{NetworkEvent, NetworkConfig, ConnectionLimits, DialError},
};
use rand::Rng;
use std::task::Poll;
Expand All @@ -47,12 +47,16 @@ fn max_outgoing() {
.expect("Unexpected connection limit.");
}

let err = network.peer(target.clone())
match network.peer(target.clone())
.dial(Multiaddr::empty(), Vec::new(), TestHandler())
.expect_err("Unexpected dialing success.");

assert_eq!(err.current, outgoing_limit);
assert_eq!(err.limit, outgoing_limit);
.expect_err("Unexpected dialing success.")
{
DialError::ConnectionLimit(err) => {
assert_eq!(err.current, outgoing_limit);
assert_eq!(err.limit, outgoing_limit);
}
e => panic!("Unexpected error: {:?}", e),
}

let info = network.info();
assert_eq!(info.num_peers(), 0);
Expand Down
17 changes: 12 additions & 5 deletions core/tests/network_dial_error.rs
Expand Up @@ -25,6 +25,7 @@ use libp2p_core::multiaddr::multiaddr;
use libp2p_core::{
PeerId,
connection::PendingConnectionError,
multiaddr::Protocol,
network::{NetworkEvent, NetworkConfig},
};
use rand::seq::SliceRandom;
Expand Down Expand Up @@ -70,7 +71,7 @@ fn deny_incoming_connec() {
error: PendingConnectionError::Transport(_)
}) => {
assert_eq!(&peer_id, swarm1.local_peer_id());
assert_eq!(multiaddr, address);
assert_eq!(multiaddr, address.clone().with(Protocol::P2p(peer_id.into())));
return Poll::Ready(Ok(()));
},
Poll::Ready(_) => unreachable!(),
Expand Down Expand Up @@ -162,21 +163,27 @@ fn dial_self_by_id() {
fn multiple_addresses_err() {
// Tries dialing multiple addresses, and makes sure there's one dialing error per address.

let target = PeerId::random();

let mut swarm = test_network(NetworkConfig::default());

let mut addresses = Vec::new();
for _ in 0 .. 3 {
addresses.push(multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())]);
addresses.push(multiaddr![
Ip4([0, 0, 0, 0]),
Tcp(rand::random::<u16>())
]);
}
for _ in 0 .. 5 {
addresses.push(multiaddr![Udp(rand::random::<u16>())]);
addresses.push(multiaddr![
Udp(rand::random::<u16>())
]);
}
addresses.shuffle(&mut rand::thread_rng());

let first = addresses[0].clone();
let rest = (&addresses[1..]).iter().cloned();

let target = PeerId::random();
swarm.peer(target.clone())
.dial(first, rest, TestHandler())
.unwrap();
Expand All @@ -191,7 +198,7 @@ fn multiple_addresses_err() {
error: PendingConnectionError::Transport(_)
}) => {
assert_eq!(peer_id, target);
let expected = addresses.remove(0);
let expected = addresses.remove(0).with(Protocol::P2p(target.clone().into()));
assert_eq!(multiaddr, expected);
if addresses.is_empty() {
assert_eq!(attempts_remaining, 0);
Expand Down

0 comments on commit 45f07bf

Please sign in to comment.