Skip to content

Commit

Permalink
Adds support for handling interface changes to mdns behaviour. (#1830)
Browse files Browse the repository at this point in the history
* mdns: handle address changes.

* Update examples.

* Use async-io.

* Fix tokio-chat.

* Address review comments.

* Update if-watch.

* Poll interfaces correctly.

* Use socket2 and remove wasm-time.

* Update if-watch.

* Update versions and changelogs.

* Further changelog updates.

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>
Co-authored-by: Roman S. Borschel <roman@parity.io>
  • Loading branch information
3 people committed Dec 3, 2020
1 parent 4bdb61b commit 505a17d
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 167 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
- [`parity-multiaddr` CHANGELOG](misc/multiaddr/CHANGELOG.md)
- [`libp2p-core-derive` CHANGELOG](misc/core-derive/CHANGELOG.md)

# Version 0.32.0 [unreleased]

- Update to `libp2p-mdns-0.26`.

# Version 0.31.2 [2020-12-02]

- Bump minimum `libp2p-core` patch version.
Expand Down
11 changes: 5 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libp2p"
edition = "2018"
description = "Peer-to-peer networking library"
version = "0.31.2"
version = "0.32.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand All @@ -17,7 +17,7 @@ default = [
"identify",
"kad",
"gossipsub",
"mdns-async-std",
"mdns",
"mplex",
"noise",
"ping",
Expand All @@ -37,8 +37,7 @@ floodsub = ["libp2p-floodsub"]
identify = ["libp2p-identify"]
kad = ["libp2p-kad"]
gossipsub = ["libp2p-gossipsub"]
mdns-async-std = ["libp2p-mdns", "libp2p-mdns/async-std"]
mdns-tokio = ["libp2p-mdns", "libp2p-mdns/tokio"]
mdns = ["libp2p-mdns"]
mplex = ["libp2p-mplex"]
noise = ["libp2p-noise"]
ping = ["libp2p-ping"]
Expand Down Expand Up @@ -87,7 +86,7 @@ wasm-timer = "0.2.4"
[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.25.0", path = "protocols/deflate", optional = true }
libp2p-dns = { version = "0.25.0", path = "transports/dns", optional = true }
libp2p-mdns = { version = "0.25.0", path = "protocols/mdns", optional = true }
libp2p-mdns = { version = "0.26.0", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.25.1", path = "transports/tcp", optional = true }
libp2p-websocket = { version = "0.26.0", path = "transports/websocket", optional = true }

Expand Down Expand Up @@ -125,4 +124,4 @@ members = [

[[example]]
name = "chat-tokio"
required-features = ["tcp-tokio", "mdns-tokio"]
required-features = ["tcp-tokio", "mdns"]
9 changes: 4 additions & 5 deletions examples/chat-tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ use libp2p::{
core::upgrade,
identity,
floodsub::{self, Floodsub, FloodsubEvent},
// `TokioMdns` is available through the `mdns-tokio` feature.
mdns::{TokioMdns, MdnsEvent},
mdns::{Mdns, MdnsEvent},
mplex,
noise,
swarm::{NetworkBehaviourEventProcess, SwarmBuilder},
Expand Down Expand Up @@ -90,7 +89,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
#[derive(NetworkBehaviour)]
struct MyBehaviour {
floodsub: Floodsub,
mdns: TokioMdns,
mdns: Mdns,
}

impl NetworkBehaviourEventProcess<FloodsubEvent> for MyBehaviour {
Expand Down Expand Up @@ -122,7 +121,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

// Create a Swarm to manage peers and events.
let mut swarm = {
let mdns = TokioMdns::new()?;
let mdns = Mdns::new().await?;
let mut behaviour = MyBehaviour {
floodsub: Floodsub::new(peer_id.clone()),
mdns,
Expand Down Expand Up @@ -172,4 +171,4 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
}
}
}
}
2 changes: 1 addition & 1 deletion examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ fn main() -> Result<(), Box<dyn Error>> {

// Create a Swarm to manage peers and events
let mut swarm = {
let mdns = Mdns::new()?;
let mdns = task::block_on(Mdns::new())?;
let mut behaviour = MyBehaviour {
floodsub: Floodsub::new(local_peer_id.clone()),
mdns,
Expand Down
2 changes: 1 addition & 1 deletion examples/distributed-key-value-store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ fn main() -> Result<(), Box<dyn Error>> {
// Create a Kademlia behaviour.
let store = MemoryStore::new(local_peer_id.clone());
let kademlia = Kademlia::new(local_peer_id.clone(), store);
let mdns = Mdns::new()?;
let mdns = task::block_on(Mdns::new())?;
let behaviour = MyBehaviour { kademlia, mdns };
Swarm::new(transport, behaviour, local_peer_id)
};
Expand Down
2 changes: 1 addition & 1 deletion examples/mdns-passive-discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn main() -> Result<(), Box<dyn Error>> {
// This example provides passive discovery of the libp2p nodes on the
// network that send mDNS queries and answers.
task::block_on(async move {
let mut service = MdnsService::new()?;
let mut service = MdnsService::new().await?;
loop {
let (srv, packet) = service.next().await;
match packet {
Expand Down
13 changes: 13 additions & 0 deletions protocols/mdns/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# 0.26.0 [unreleased]

- Detect interface changes and join the MDNS multicast
group on all interfaces as they become available.
[PR 1830](https://github.com/libp2p/rust-libp2p/pull/1830).

- Replace the use of macros for abstracting over `tokio`
and `async-std` with the use of `async-io`. As a result
there may now be an additional reactor thread running
called `async-io` when using `tokio`, with the futures
still being polled by the `tokio` runtime.
[PR 1830](https://github.com/libp2p/rust-libp2p/pull/1830).

# 0.25.0 [2020-11-25]

- Update `libp2p-swarm` and `libp2p-core`.
Expand Down
31 changes: 15 additions & 16 deletions protocols/mdns/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "libp2p-mdns"
edition = "2018"
version = "0.25.0"
version = "0.26.0"
description = "Implementation of the libp2p mDNS discovery method"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
Expand All @@ -10,22 +10,21 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
async-std = { version = "1.6.2", optional = true }
data-encoding = "2.0"
dns-parser = "0.8"
either = "1.5.3"
futures = "0.3.1"
lazy_static = "1.2"
async-io = "1.3.0"
data-encoding = "2.3.1"
dns-parser = "0.8.0"
futures = "0.3.8"
if-watch = "0.1.6"
lazy_static = "1.4.0"
libp2p-core = { version = "0.25.0", path = "../../core" }
libp2p-swarm = { version = "0.25.0", path = "../../swarm" }
log = "0.4"
net2 = "0.2"
rand = "0.7"
smallvec = "1.0"
tokio = { version = "0.3", default-features = false, features = ["net"], optional = true }
void = "1.0"
wasm-timer = "0.2.4"
log = "0.4.11"
rand = "0.7.3"
smallvec = "1.5.0"
socket2 = { version = "0.3.17", features = ["reuseport"] }
void = "1.0.2"

[dev-dependencies]
if-addrs = "0.6.4"
tokio = { version = "0.3", default-features = false, features = ["rt", "rt-multi-thread"] }
async-std = "1.7.0"
if-addrs = "0.6.5"
tokio = { version = "0.3.4", default-features = false, features = ["rt", "rt-multi-thread"] }
87 changes: 36 additions & 51 deletions protocols/mdns/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::service::{MdnsPacket, build_query_response, build_service_discovery_response};
use crate::service::{MdnsPacket, MdnsService, build_query_response, build_service_discovery_response};
use async_io::Timer;
use futures::prelude::*;
use libp2p_core::{
Multiaddr,
Expand All @@ -34,21 +35,16 @@ use libp2p_swarm::{
ProtocolsHandler,
protocols_handler::DummyProtocolsHandler
};
use log::warn;
use smallvec::SmallVec;
use std::{cmp, fmt, io, iter, mem, pin::Pin, time::Duration, task::Context, task::Poll};
use wasm_timer::{Delay, Instant};
use std::{cmp, fmt, io, iter, mem, pin::Pin, time::{Duration, Instant}, task::Context, task::Poll};

const MDNS_RESPONSE_TTL: std::time::Duration = Duration::from_secs(5 * 60);

macro_rules! codegen {
($feature_name:expr, $behaviour_name:ident, $maybe_busy_wrapper:ident, $service_name:ty) => {

/// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds
/// them to the topology.
pub struct $behaviour_name {
pub struct Mdns {
/// The inner service.
service: $maybe_busy_wrapper,
service: MdnsBusyWrapper,

/// List of nodes that we have discovered, the address, and when their TTL expires.
///
Expand All @@ -59,44 +55,44 @@ pub struct $behaviour_name {
/// Future that fires when the TTL of at least one node in `discovered_nodes` expires.
///
/// `None` if `discovered_nodes` is empty.
closest_expiration: Option<Delay>,
closest_expiration: Option<Timer>,
}

/// `MdnsService::next` takes ownership of `self`, returning a future that resolves with both itself
/// and a `MdnsPacket` (similar to the old Tokio socket send style). The two states are thus `Free`
/// with an `MdnsService` or `Busy` with a future returning the original `MdnsService` and an
/// `MdnsPacket`.
enum $maybe_busy_wrapper {
Free($service_name),
Busy(Pin<Box<dyn Future<Output = ($service_name, MdnsPacket)> + Send>>),
enum MdnsBusyWrapper {
Free(MdnsService),
Busy(Pin<Box<dyn Future<Output = (MdnsService, MdnsPacket)> + Send>>),
Poisoned,
}

impl fmt::Debug for $maybe_busy_wrapper {
impl fmt::Debug for MdnsBusyWrapper {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
$maybe_busy_wrapper::Free(service) => {
fmt.debug_struct("$maybe_busy_wrapper::Free")
Self::Free(service) => {
fmt.debug_struct("MdnsBusyWrapper::Free")
.field("service", service)
.finish()
},
$maybe_busy_wrapper::Busy(_) => {
fmt.debug_struct("$maybe_busy_wrapper::Busy")
Self::Busy(_) => {
fmt.debug_struct("MdnsBusyWrapper::Busy")
.finish()
}
$maybe_busy_wrapper::Poisoned => {
fmt.debug_struct("$maybe_busy_wrapper::Poisoned")
Self::Poisoned => {
fmt.debug_struct("MdnsBusyWrapper::Poisoned")
.finish()
}
}
}
}

impl $behaviour_name {
impl Mdns {
/// Builds a new `Mdns` behaviour.
pub fn new() -> io::Result<$behaviour_name> {
Ok($behaviour_name {
service: $maybe_busy_wrapper::Free(<$service_name>::new()?),
pub async fn new() -> io::Result<Self> {
Ok(Self {
service: MdnsBusyWrapper::Free(MdnsService::new().await?),
discovered_nodes: SmallVec::new(),
closest_expiration: None,
})
Expand All @@ -113,7 +109,7 @@ impl $behaviour_name {
}
}

impl NetworkBehaviour for $behaviour_name {
impl NetworkBehaviour for Mdns {
type ProtocolsHandler = DummyProtocolsHandler;
type OutEvent = MdnsEvent;

Expand All @@ -138,9 +134,9 @@ impl NetworkBehaviour for $behaviour_name {
&mut self,
_: PeerId,
_: ConnectionId,
_ev: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
ev: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
void::unreachable(_ev)
void::unreachable(ev)
}

fn poll(
Expand All @@ -155,9 +151,8 @@ impl NetworkBehaviour for $behaviour_name {
> {
// Remove expired peers.
if let Some(ref mut closest_expiration) = self.closest_expiration {
match Future::poll(Pin::new(closest_expiration), cx) {
Poll::Ready(Ok(())) => {
let now = Instant::now();
match Pin::new(closest_expiration).poll(cx) {
Poll::Ready(now) => {
let mut expired = SmallVec::<[(PeerId, Multiaddr); 4]>::new();
while let Some(pos) = self.discovered_nodes.iter().position(|(_, _, exp)| *exp < now) {
let (peer_id, addr, _) = self.discovered_nodes.remove(pos);
Expand All @@ -173,38 +168,37 @@ impl NetworkBehaviour for $behaviour_name {
}
},
Poll::Pending => (),
Poll::Ready(Err(err)) => warn!("timer has errored: {:?}", err),
}
}

// Polling the mDNS service, and obtain the list of nodes discovered this round.
let discovered = loop {
let service = mem::replace(&mut self.service, $maybe_busy_wrapper::Poisoned);
let service = mem::replace(&mut self.service, MdnsBusyWrapper::Poisoned);

let packet = match service {
$maybe_busy_wrapper::Free(service) => {
self.service = $maybe_busy_wrapper::Busy(Box::pin(service.next()));
MdnsBusyWrapper::Free(service) => {
self.service = MdnsBusyWrapper::Busy(Box::pin(service.next()));
continue;
},
$maybe_busy_wrapper::Busy(mut fut) => {
MdnsBusyWrapper::Busy(mut fut) => {
match fut.as_mut().poll(cx) {
Poll::Ready((service, packet)) => {
self.service = $maybe_busy_wrapper::Free(service);
self.service = MdnsBusyWrapper::Free(service);
packet
},
Poll::Pending => {
self.service = $maybe_busy_wrapper::Busy(fut);
self.service = MdnsBusyWrapper::Busy(fut);
return Poll::Pending;
}
}
},
$maybe_busy_wrapper::Poisoned => panic!("Mdns poisoned"),
MdnsBusyWrapper::Poisoned => panic!("Mdns poisoned"),
};

match packet {
MdnsPacket::Query(query) => {
// MaybeBusyMdnsService should always be Free.
if let $maybe_busy_wrapper::Free(ref mut service) = self.service {
if let MdnsBusyWrapper::Free(ref mut service) = self.service {
let resp = build_query_response(
query.query_id(),
params.local_peer_id().clone(),
Expand Down Expand Up @@ -256,7 +250,7 @@ impl NetworkBehaviour for $behaviour_name {
},
MdnsPacket::ServiceDiscovery(disc) => {
// MaybeBusyMdnsService should always be Free.
if let $maybe_busy_wrapper::Free(ref mut service) = self.service {
if let MdnsBusyWrapper::Free(ref mut service) = self.service {
let resp = build_service_discovery_response(
disc.query_id(),
MDNS_RESPONSE_TTL,
Expand All @@ -273,31 +267,22 @@ impl NetworkBehaviour for $behaviour_name {
.fold(None, |exp, &(_, _, elem_exp)| {
Some(exp.map(|exp| cmp::min(exp, elem_exp)).unwrap_or(elem_exp))
})
.map(Delay::new_at);
.map(Timer::at);

Poll::Ready(NetworkBehaviourAction::GenerateEvent(MdnsEvent::Discovered(DiscoveredAddrsIter {
inner: discovered.into_iter(),
})))
}
}

impl fmt::Debug for $behaviour_name {
impl fmt::Debug for Mdns {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Mdns")
.field("service", &self.service)
.finish()
}
}

};
}

#[cfg(feature = "async-std")]
codegen!("async-std", Mdns, MaybeBusyMdnsService, crate::service::MdnsService);

#[cfg(feature = "tokio")]
codegen!("tokio", TokioMdns, MaybeBusyTokioMdnsService, crate::service::TokioMdnsService);

/// Event that can be produced by the `Mdns` behaviour.
#[derive(Debug)]
pub enum MdnsEvent {
Expand Down
Loading

0 comments on commit 505a17d

Please sign in to comment.