Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mdns: Refactor encoding/decoding #2162

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion protocols/mdns/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
anyhow = "1.0.42"
async-io = "1.3.1"
byteorder = "1.4.3"
data-encoding = "2.3.2"
dns-parser = "0.8.0"
futures = "0.3.13"
if-watch = "0.2.0"
lazy_static = "1.4.0"
Expand Down
146 changes: 86 additions & 60 deletions protocols/mdns/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::dns::{build_query, build_query_response, build_service_discovery_response};
use crate::query::MdnsPacket;
use crate::packet::MdnsPacket;
use crate::IPV4_MDNS_MULTICAST_ADDRESS;
use async_io::{Async, Timer};
use futures::prelude::*;
Expand Down Expand Up @@ -153,6 +152,13 @@ impl Mdns {
Async::new(socket)?
};
let if_watch = if_watch::IfWatcher::new().await?;
// Randomize timer to prevent timers across nodes to converge and fire at the same time.
let query_interval = {
use rand::Rng;
let mut rng = rand::thread_rng();
let jitter = rng.gen_range(0..100);
config.query_interval + Duration::from_millis(jitter)
dvc94ch marked this conversation as resolved.
Show resolved Hide resolved
};
Ok(Self {
recv_socket,
send_socket,
Expand All @@ -162,9 +168,9 @@ impl Mdns {
discovered_nodes: SmallVec::new(),
closest_expiration: None,
events: Default::default(),
query_interval: config.query_interval,
query_interval,
ttl: config.ttl,
timeout: Timer::interval(config.query_interval),
timeout: Timer::interval(query_interval),
multicast_addr: config.multicast_addr,
})
}
Expand All @@ -179,55 +185,74 @@ impl Mdns {
self.discovered_nodes.iter().map(|(p, _, _)| p)
}

fn inject_mdns_packet(&mut self, packet: MdnsPacket, params: &impl PollParameters) {
fn reset_timer(&mut self) {
self.timeout.set_interval(self.query_interval);
}

fn fire_timer(&mut self) {
self.timeout
.set_interval_at(Instant::now(), self.query_interval);
mxinden marked this conversation as resolved.
Show resolved Hide resolved
}

fn send_packet(&mut self, packet: MdnsPacket) {
for packet in packet.to_encoded_packets() {
self.send_buffer.push_back(packet);
}
}

fn inject_mdns_packet(
&mut self,
packet: MdnsPacket,
from: SocketAddr,
params: &impl PollParameters,
) {
match packet {
MdnsPacket::Query(query) => {
self.timeout.set_interval(self.query_interval);
MdnsPacket::PeerRequest { query_id } => {
self.reset_timer();
dvc94ch marked this conversation as resolved.
Show resolved Hide resolved
log::trace!("sending response");
for packet in build_query_response(
query.query_id(),
*params.local_peer_id(),
params.listened_addresses(),
self.ttl,
) {
self.send_buffer.push_back(packet);
}
let response = MdnsPacket::PeerResponse {
query_id,
peer_id: *params.local_peer_id(),
addresses: params.listened_addresses().collect(),
ttl: self.ttl,
};
self.send_packet(response);
}
MdnsPacket::Response(response) => {
MdnsPacket::PeerResponse {
query_id: _,
peer_id,
addresses,
ttl,
} => {
// We replace the IP address with the address we observe the
// remote as and the address they listen on.
let obs_ip = Protocol::from(response.remote_addr().ip());
let obs_port = Protocol::Udp(response.remote_addr().port());
let obs_ip = Protocol::from(from.ip());
let obs_port = Protocol::Udp(from.port());
let observed: Multiaddr = iter::once(obs_ip).chain(iter::once(obs_port)).collect();

let mut discovered: SmallVec<[_; 4]> = SmallVec::new();
for peer in response.discovered_peers() {
if peer.id() == params.local_peer_id() {
continue;
}

let new_expiration = Instant::now() + peer.ttl();

let mut addrs: Vec<Multiaddr> = Vec::new();
for addr in peer.addresses() {
if let Some(new_addr) = address_translation(&addr, &observed) {
addrs.push(new_addr.clone())
}
addrs.push(addr.clone())
if peer_id == *params.local_peer_id() {
return;
}
let new_expiration = Instant::now() + ttl;
let mut addrs: Vec<Multiaddr> = Vec::new();
for addr in addresses {
if let Some(new_addr) = address_translation(&addr, &observed) {
addrs.push(new_addr.clone())
}

for addr in addrs {
if let Some((_, _, cur_expires)) = self
.discovered_nodes
.iter_mut()
.find(|(p, a, _)| p == peer.id() && *a == addr)
{
*cur_expires = cmp::max(*cur_expires, new_expiration);
} else {
self.discovered_nodes
.push((*peer.id(), addr.clone(), new_expiration));
discovered.push((*peer.id(), addr));
}
addrs.push(addr.clone())
}
for addr in addrs {
if let Some((_, _, cur_expires)) = self
.discovered_nodes
.iter_mut()
.find(|(p, a, _)| p == &peer_id && *a == addr)
{
*cur_expires = cmp::max(*cur_expires, new_expiration);
} else {
self.discovered_nodes
.push((peer_id, addr.clone(), new_expiration));
discovered.push((peer_id, addr));
}
}

Expand All @@ -244,10 +269,14 @@ impl Mdns {
inner: discovered.into_iter(),
}));
}
MdnsPacket::ServiceDiscovery(disc) => {
let resp = build_service_discovery_response(disc.query_id(), self.ttl);
self.send_buffer.push_back(resp);
MdnsPacket::ServiceRequest { query_id } => {
let response = MdnsPacket::ServiceResponse {
query_id,
ttl: self.ttl,
};
self.send_packet(response);
}
MdnsPacket::ServiceResponse { .. } => {}
}
}
}
Expand Down Expand Up @@ -279,8 +308,7 @@ impl NetworkBehaviour for Mdns {
}

fn inject_new_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) {
self.timeout
.set_interval_at(Instant::now(), self.query_interval);
self.fire_timer();
}

fn poll(
Expand All @@ -307,8 +335,7 @@ impl NetworkBehaviour for Mdns {
if let Err(err) = socket.join_multicast_v4(&multicast, &addr) {
log::error!("join multicast failed: {}", err);
} else {
self.timeout
.set_interval_at(Instant::now(), self.query_interval);
self.fire_timer();
}
}
}
Expand All @@ -318,8 +345,7 @@ impl NetworkBehaviour for Mdns {
if let Err(err) = socket.join_multicast_v6(&multicast, 0) {
log::error!("join multicast failed: {}", err);
} else {
self.timeout
.set_interval_at(Instant::now(), self.query_interval);
self.fire_timer();
}
}
}
Expand Down Expand Up @@ -358,12 +384,10 @@ impl NetworkBehaviour for Mdns {
.recv_from(&mut self.recv_buffer)
.now_or_never()
{
Some(Ok((len, from))) => {
if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from)
{
self.inject_mdns_packet(packet, params);
}
}
Some(Ok((len, from))) => match MdnsPacket::from_bytes(&self.recv_buffer[..len]) {
Ok(packet) => self.inject_mdns_packet(packet, from, params),
Err(err) => log::error!("failed to parse mdns packet from {}: {}", from, err),
},
Some(Err(err)) => log::error!("Failed reading datagram: {}", err),
_ => {}
}
Expand All @@ -382,7 +406,9 @@ impl NetworkBehaviour for Mdns {
}
} else if Pin::new(&mut self.timeout).poll_next(cx).is_ready() {
log::trace!("sending query");
self.send_buffer.push_back(build_query());
self.send_packet(MdnsPacket::PeerRequest {
query_id: rand::random(),
});
} else {
break;
}
Expand Down
Loading