Skip to content

Commit

Permalink
Use async-io.
Browse files Browse the repository at this point in the history
  • Loading branch information
dvc94ch committed Nov 17, 2020
1 parent c214ab5 commit b7f33f6
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 90 deletions.
7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ default = [
"identify",
"kad",
"gossipsub",
"mdns-async-std",
"mdns",
"mplex",
"noise",
"ping",
Expand All @@ -37,8 +37,7 @@ floodsub = ["libp2p-floodsub"]
identify = ["libp2p-identify"]
kad = ["libp2p-kad"]
gossipsub = ["libp2p-gossipsub"]
mdns-async-std = ["libp2p-mdns", "libp2p-mdns/async-std"]
mdns-tokio = ["libp2p-mdns", "libp2p-mdns/tokio"]
mdns = ["libp2p-mdns"]
mplex = ["libp2p-mplex"]
noise = ["libp2p-noise"]
ping = ["libp2p-ping"]
Expand Down Expand Up @@ -125,4 +124,4 @@ members = [

[[example]]
name = "chat-tokio"
required-features = ["tcp-tokio", "mdns-tokio"]
required-features = ["tcp-tokio", "mdns"]
6 changes: 3 additions & 3 deletions protocols/mdns/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
async-std = { version = "1.6.2", optional = true }
async-io = "1.2.0"
either = "1.5.3"
data-encoding = "2.0"
dns-parser = "0.8"
either = "1.5.3"
futures = "0.3.1"
if-watch = "0.1.2"
lazy_static = "1.2"
Expand All @@ -23,10 +23,10 @@ log = "0.4"
net2 = "0.2"
rand = "0.7"
smallvec = "1.0"
tokio = { version = "0.3", default-features = false, features = ["net"], optional = true }
void = "1.0"
wasm-timer = "0.2.4"

[dev-dependencies]
async-std = "1.7.0"
if-addrs = "0.6.4"
tokio = { version = "0.3", default-features = false, features = ["rt", "rt-multi-thread"] }
68 changes: 28 additions & 40 deletions protocols/mdns/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::service::{MdnsPacket, build_query_response, build_service_discovery_response};
use crate::service::{MdnsPacket, MdnsService, build_query_response, build_service_discovery_response};
use futures::prelude::*;
use libp2p_core::{
Multiaddr,
Expand All @@ -41,14 +41,11 @@ use wasm_timer::{Delay, Instant};

const MDNS_RESPONSE_TTL: std::time::Duration = Duration::from_secs(5 * 60);

macro_rules! codegen {
($feature_name:expr, $behaviour_name:ident, $maybe_busy_wrapper:ident, $service_name:ty) => {

/// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds
/// them to the topology.
pub struct $behaviour_name {
pub struct Mdns {
/// The inner service.
service: $maybe_busy_wrapper,
service: MdnsBusyWrapper,

/// List of nodes that we have discovered, the address, and when their TTL expires.
///
Expand All @@ -66,37 +63,37 @@ pub struct $behaviour_name {
/// and a `MdnsPacket` (similar to the old Tokio socket send style). The two states are thus `Free`
/// with an `MdnsService` or `Busy` with a future returning the original `MdnsService` and an
/// `MdnsPacket`.
enum $maybe_busy_wrapper {
Free($service_name),
Busy(Pin<Box<dyn Future<Output = ($service_name, MdnsPacket)> + Send>>),
enum MdnsBusyWrapper {
Free(MdnsService),
Busy(Pin<Box<dyn Future<Output = (MdnsService, MdnsPacket)> + Send>>),
Poisoned,
}

impl fmt::Debug for $maybe_busy_wrapper {
impl fmt::Debug for MdnsBusyWrapper {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
$maybe_busy_wrapper::Free(service) => {
fmt.debug_struct("$maybe_busy_wrapper::Free")
Self::Free(service) => {
fmt.debug_struct("MdnsBusyWrapper::Free")
.field("service", service)
.finish()
},
$maybe_busy_wrapper::Busy(_) => {
fmt.debug_struct("$maybe_busy_wrapper::Busy")
Self::Busy(_) => {
fmt.debug_struct("MdnsBusyWrapper::Busy")
.finish()
}
$maybe_busy_wrapper::Poisoned => {
fmt.debug_struct("$maybe_busy_wrapper::Poisoned")
Self::Poisoned => {
fmt.debug_struct("MdnsBusyWrapper::Poisoned")
.finish()
}
}
}
}

impl $behaviour_name {
impl Mdns {
/// Builds a new `Mdns` behaviour.
pub async fn new() -> io::Result<$behaviour_name> {
Ok($behaviour_name {
service: $maybe_busy_wrapper::Free(<$service_name>::new().await?),
pub async fn new() -> io::Result<Self> {
Ok(Self {
service: MdnsBusyWrapper::Free(MdnsService::new().await?),
discovered_nodes: SmallVec::new(),
closest_expiration: None,
})
Expand All @@ -113,7 +110,7 @@ impl $behaviour_name {
}
}

impl NetworkBehaviour for $behaviour_name {
impl NetworkBehaviour for Mdns {
type ProtocolsHandler = DummyProtocolsHandler;
type OutEvent = MdnsEvent;

Expand Down Expand Up @@ -179,32 +176,32 @@ impl NetworkBehaviour for $behaviour_name {

// Polling the mDNS service, and obtain the list of nodes discovered this round.
let discovered = loop {
let service = mem::replace(&mut self.service, $maybe_busy_wrapper::Poisoned);
let service = mem::replace(&mut self.service, MdnsBusyWrapper::Poisoned);

let packet = match service {
$maybe_busy_wrapper::Free(service) => {
self.service = $maybe_busy_wrapper::Busy(Box::pin(service.next()));
MdnsBusyWrapper::Free(service) => {
self.service = MdnsBusyWrapper::Busy(Box::pin(service.next()));
continue;
},
$maybe_busy_wrapper::Busy(mut fut) => {
MdnsBusyWrapper::Busy(mut fut) => {
match fut.as_mut().poll(cx) {
Poll::Ready((service, packet)) => {
self.service = $maybe_busy_wrapper::Free(service);
self.service = MdnsBusyWrapper::Free(service);
packet
},
Poll::Pending => {
self.service = $maybe_busy_wrapper::Busy(fut);
self.service = MdnsBusyWrapper::Busy(fut);
return Poll::Pending;
}
}
},
$maybe_busy_wrapper::Poisoned => panic!("Mdns poisoned"),
MdnsBusyWrapper::Poisoned => panic!("Mdns poisoned"),
};

match packet {
MdnsPacket::Query(query) => {
// MaybeBusyMdnsService should always be Free.
if let $maybe_busy_wrapper::Free(ref mut service) = self.service {
if let MdnsBusyWrapper::Free(ref mut service) = self.service {
let resp = build_query_response(
query.query_id(),
params.local_peer_id().clone(),
Expand Down Expand Up @@ -256,7 +253,7 @@ impl NetworkBehaviour for $behaviour_name {
},
MdnsPacket::ServiceDiscovery(disc) => {
// MaybeBusyMdnsService should always be Free.
if let $maybe_busy_wrapper::Free(ref mut service) = self.service {
if let MdnsBusyWrapper::Free(ref mut service) = self.service {
let resp = build_service_discovery_response(
disc.query_id(),
MDNS_RESPONSE_TTL,
Expand All @@ -281,23 +278,14 @@ impl NetworkBehaviour for $behaviour_name {
}
}

impl fmt::Debug for $behaviour_name {
impl fmt::Debug for Mdns {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Mdns")
.field("service", &self.service)
.finish()
}
}

};
}

#[cfg(feature = "async-std")]
codegen!("async-std", Mdns, MaybeBusyMdnsService, crate::service::MdnsService);

#[cfg(feature = "tokio")]
codegen!("tokio", TokioMdns, MaybeBusyTokioMdnsService, crate::service::TokioMdnsService);

/// Event that can be produced by the `Mdns` behaviour.
#[derive(Debug)]
pub enum MdnsEvent {
Expand Down
10 changes: 4 additions & 6 deletions protocols/mdns/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,10 @@ const SERVICE_NAME: &[u8] = b"_p2p._udp.local";
/// Hardcoded name of the service used for DNS-SD.
const META_QUERY_SERVICE: &[u8] = b"_services._dns-sd._udp.local";

#[cfg(feature = "async-std")]
pub use self::{behaviour::Mdns, service::MdnsService};
#[cfg(feature = "tokio")]
pub use self::{behaviour::TokioMdns, service::TokioMdnsService};

pub use self::behaviour::MdnsEvent;
pub use crate::{
behaviour::{Mdns, MdnsEvent},
service::MdnsService,
};

mod behaviour;
mod dns;
Expand Down
51 changes: 16 additions & 35 deletions protocols/mdns/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
// DEALINGS IN THE SOFTWARE.

use crate::{SERVICE_NAME, META_QUERY_SERVICE, dns};
use async_io::Async;
use dns_parser::{Packet, RData};
use either::Either::{Left, Right};
use futures::{future, prelude::*};
use libp2p_core::{multiaddr::{Multiaddr, Protocol}, PeerId};
use log::warn;
use std::{convert::TryFrom as _, fmt, io, net::Ipv4Addr, net::SocketAddr, str, time::{Duration, Instant}};
use std::{convert::TryFrom, fmt, io, net::Ipv4Addr, net::{UdpSocket, SocketAddr}, str, time::{Duration, Instant}};
use wasm_timer::Interval;
use lazy_static::lazy_static;

Expand All @@ -37,9 +38,6 @@ lazy_static! {
));
}

macro_rules! codegen {
($feature_name:expr, $service_name:ident, $udp_socket:ty, $udp_socket_from_std:tt) => {

/// A running service that discovers libp2p peers and responds to other libp2p peers' queries on
/// the local network.
///
Expand Down Expand Up @@ -71,10 +69,7 @@ macro_rules! codegen {
/// # let my_peer_id = PeerId::from(identity::Keypair::generate_ed25519().public());
/// # let my_listened_addrs: Vec<Multiaddr> = vec![];
/// # async {
/// # #[cfg(feature = "async-std")]
/// # let mut service = libp2p_mdns::service::MdnsService::new().await.unwrap();
/// # #[cfg(feature = "tokio")]
/// # let mut service = libp2p_mdns::service::TokioMdnsService::new().await.unwrap();
/// let _future_to_poll = async {
/// let (mut service, packet) = service.next().await;
///
Expand Down Expand Up @@ -108,13 +103,12 @@ macro_rules! codegen {
/// };
/// # };
/// # }
#[cfg_attr(docsrs, doc(cfg(feature = $feature_name)))]
pub struct $service_name {
pub struct MdnsService {
/// Main socket for listening.
socket: $udp_socket,
socket: Async<UdpSocket>,

/// Socket for sending queries on the network.
query_socket: $udp_socket,
query_socket: Async<UdpSocket>,

/// Interval for sending queries.
query_interval: Interval,
Expand All @@ -137,42 +131,41 @@ pub struct $service_name {
if_watch: if_watch::IfWatcher,
}

impl $service_name {
impl MdnsService {
/// Starts a new mDNS service.
pub async fn new() -> io::Result<$service_name> {
pub async fn new() -> io::Result<Self> {
Self::new_inner(false).await
}

/// Same as `new`, but we don't automatically send queries on the network.
pub async fn silent() -> io::Result<$service_name> {
pub async fn silent() -> io::Result<Self> {
Self::new_inner(true).await
}

/// Starts a new mDNS service.
async fn new_inner(silent: bool) -> io::Result<$service_name> {
async fn new_inner(silent: bool) -> io::Result<Self> {
let socket = {
let builder = net2::UdpBuilder::new_v4()?;
builder.reuse_address(true)?;
#[cfg(unix)]
net2::unix::UnixUdpBuilderExt::reuse_port(&builder, true)?;
let socket = builder.bind((Ipv4Addr::UNSPECIFIED, 5353))?;
let socket = $udp_socket_from_std(socket)?;
socket.set_multicast_loop_v4(true)?;
socket.set_multicast_ttl_v4(255)?;
socket
Async::new(socket)?
};

// Given that we pass an IP address to bind, which does not need to be resolved, we can
// use std::net::UdpSocket::bind, instead of its async counterpart from async-std.
let query_socket = {
let socket = std::net::UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0))?;
$udp_socket_from_std(socket)?
Async::new(socket)?
};


let if_watch = if_watch::IfWatcher::new().await?;

Ok($service_name {
Ok(Self {
socket,
query_socket,
query_interval: Interval::new_at(Instant::now(), Duration::from_secs(20)),
Expand Down Expand Up @@ -217,7 +210,8 @@ impl $service_name {
if let Ok(if_watch::IfEvent::Up(inet)) = event {
if inet.addr().is_ipv4() && !inet.addr().is_loopback() {
self.socket
.join_multicast_v4(From::from([224, 0, 0, 251]), Ipv4Addr::UNSPECIFIED)
.get_ref()
.join_multicast_v4(&From::from([224, 0, 0, 251]), &Ipv4Addr::UNSPECIFIED)
.ok();
}
}
Expand Down Expand Up @@ -294,25 +288,14 @@ impl $service_name {
}
}

impl fmt::Debug for $service_name {
impl fmt::Debug for MdnsService {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("$service_name")
.field("silent", &self.silent)
.finish()
}
}

};
}

#[cfg(feature = "async-std")]
codegen!("async-std", MdnsService, async_std::net::UdpSocket, (|socket| Ok::<_, std::io::Error>(async_std::net::UdpSocket::from(socket))));

// Note: Tokio's UdpSocket::from_std does not set the socket into non-blocking mode.
#[cfg(feature = "tokio")]
codegen!("tokio", TokioMdnsService, tokio::net::UdpSocket, (|socket: std::net::UdpSocket| { socket.set_nonblocking(true); tokio::net::UdpSocket::from_std(socket) }));


/// A valid mDNS packet received by the service.
#[derive(Debug)]
pub enum MdnsPacket {
Expand Down Expand Up @@ -700,17 +683,15 @@ mod tests {
}
}

#[cfg(feature = "async-std")]
testgen!(
async_std,
crate::service::MdnsService,
(|fut| async_std::task::block_on::<_, ()>(fut))
);

#[cfg(feature = "tokio")]
testgen!(
tokio,
crate::service::TokioMdnsService,
crate::service::MdnsService,
(|fut| tokio::runtime::Runtime::new().unwrap().block_on::<futures::future::BoxFuture<()>>(fut))
);
}
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ pub use libp2p_gossipsub as gossipsub;
#[cfg_attr(docsrs, doc(cfg(feature = "mplex")))]
#[doc(inline)]
pub use libp2p_mplex as mplex;
#[cfg(any(feature = "mdns-async-std", feature = "mdns-tokio"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "mdns-async-std", feature = "mdns-tokio"))))]
#[cfg(feature = "mdns")]
#[cfg_attr(docsrs, doc(cfg(feature = "mdns")))]
#[cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))]
#[doc(inline)]
pub use libp2p_mdns as mdns;
Expand Down

0 comments on commit b7f33f6

Please sign in to comment.