Skip to content

Commit

Permalink
Refactor mdns packet encoding/decoding.
Browse files Browse the repository at this point in the history
  • Loading branch information
dvc94ch committed Aug 3, 2021
1 parent 03b04a5 commit 4c293d4
Show file tree
Hide file tree
Showing 6 changed files with 493 additions and 822 deletions.
3 changes: 2 additions & 1 deletion protocols/mdns/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
anyhow = "1.0.42"
async-io = "1.3.1"
byteorder = "1.4.3"
data-encoding = "2.3.2"
dns-parser = "0.8.0"
futures = "0.3.13"
if-watch = "0.2.0"
lazy_static = "1.4.0"
Expand Down
119 changes: 65 additions & 54 deletions protocols/mdns/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::packet::MdnsPacket;
use crate::IPV4_MDNS_MULTICAST_ADDRESS;
use crate::dns::{build_query, build_query_response, build_service_discovery_response};
use crate::query::MdnsPacket;
use async_io::{Async, Timer};
use futures::prelude::*;
use if_watch::{IfEvent, IfWatcher};
use libp2p_core::connection::ListenerId;
use libp2p_core::{
address_translation, multiaddr::Protocol, Multiaddr, PeerId,
};
use libp2p_core::{address_translation, multiaddr::Protocol, Multiaddr, PeerId};
use libp2p_swarm::{
protocols_handler::DummyProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction,
PollParameters, ProtocolsHandler,
Expand Down Expand Up @@ -197,55 +194,65 @@ impl Mdns {
.set_interval_at(Instant::now(), self.query_interval);
}

fn inject_mdns_packet(&mut self, packet: MdnsPacket, params: &impl PollParameters) {
fn send_packet(&mut self, packet: MdnsPacket) {
for packet in packet.to_encoded_packets() {
self.send_buffer.push_back(packet);
}
}

fn inject_mdns_packet(
&mut self,
packet: MdnsPacket,
from: SocketAddr,
params: &impl PollParameters,
) {
match packet {
MdnsPacket::Query(query) => {
MdnsPacket::PeerRequest { query_id } => {
self.reset_timer();
log::trace!("sending response");
for packet in build_query_response(
query.query_id(),
*params.local_peer_id(),
params.listened_addresses(),
self.ttl,
) {
self.send_buffer.push_back(packet);
}
let response = MdnsPacket::PeerResponse {
query_id,
peer_id: *params.local_peer_id(),
addresses: params.listened_addresses().collect(),
ttl: self.ttl,
};
self.send_packet(response);
}
MdnsPacket::Response(response) => {
MdnsPacket::PeerResponse {
query_id: _,
peer_id,
addresses,
ttl,
} => {
// We replace the IP address with the address we observe the
// remote as and the address they listen on.
let obs_ip = Protocol::from(response.remote_addr().ip());
let obs_port = Protocol::Udp(response.remote_addr().port());
let obs_ip = Protocol::from(from.ip());
let obs_port = Protocol::Udp(from.port());
let observed: Multiaddr = iter::once(obs_ip).chain(iter::once(obs_port)).collect();

let mut discovered: SmallVec<[_; 4]> = SmallVec::new();
for peer in response.discovered_peers() {
if peer.id() == params.local_peer_id() {
continue;
}

let new_expiration = Instant::now() + peer.ttl();

let mut addrs: Vec<Multiaddr> = Vec::new();
for addr in peer.addresses() {
if let Some(new_addr) = address_translation(&addr, &observed) {
addrs.push(new_addr.clone())
}
addrs.push(addr.clone())
if peer_id == *params.local_peer_id() {
return;
}
let new_expiration = Instant::now() + ttl;
let mut addrs: Vec<Multiaddr> = Vec::new();
for addr in addresses {
if let Some(new_addr) = address_translation(&addr, &observed) {
addrs.push(new_addr.clone())
}

for addr in addrs {
if let Some((_, _, cur_expires)) = self
.discovered_nodes
.iter_mut()
.find(|(p, a, _)| p == peer.id() && *a == addr)
{
*cur_expires = cmp::max(*cur_expires, new_expiration);
} else {
self.discovered_nodes
.push((*peer.id(), addr.clone(), new_expiration));
discovered.push((*peer.id(), addr));
}
addrs.push(addr.clone())
}
for addr in addrs {
if let Some((_, _, cur_expires)) = self
.discovered_nodes
.iter_mut()
.find(|(p, a, _)| p == &peer_id && *a == addr)
{
*cur_expires = cmp::max(*cur_expires, new_expiration);
} else {
self.discovered_nodes
.push((peer_id, addr.clone(), new_expiration));
discovered.push((peer_id, addr));
}
}

Expand All @@ -262,10 +269,14 @@ impl Mdns {
inner: discovered.into_iter(),
}));
}
MdnsPacket::ServiceDiscovery(disc) => {
let resp = build_service_discovery_response(disc.query_id(), self.ttl);
self.send_buffer.push_back(resp);
MdnsPacket::ServiceRequest { query_id } => {
let response = MdnsPacket::ServiceResponse {
query_id,
ttl: self.ttl,
};
self.send_packet(response);
}
MdnsPacket::ServiceResponse { .. } => {}
}
}
}
Expand Down Expand Up @@ -373,12 +384,10 @@ impl NetworkBehaviour for Mdns {
.recv_from(&mut self.recv_buffer)
.now_or_never()
{
Some(Ok((len, from))) => {
if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from)
{
self.inject_mdns_packet(packet, params);
}
}
Some(Ok((len, from))) => match MdnsPacket::from_bytes(&self.recv_buffer[..len]) {
Ok(packet) => self.inject_mdns_packet(packet, from, params),
Err(err) => log::error!("failed to parse mdns packet from {}: {}", from, err),
},
Some(Err(err)) => log::error!("Failed reading datagram: {}", err),
_ => {}
}
Expand All @@ -397,7 +406,9 @@ impl NetworkBehaviour for Mdns {
}
} else if Pin::new(&mut self.timeout).poll_next(cx).is_ready() {
log::trace!("sending query");
self.send_buffer.push_back(build_query());
self.send_packet(MdnsPacket::PeerRequest {
query_id: rand::random(),
});
} else {
break;
}
Expand Down
Loading

0 comments on commit 4c293d4

Please sign in to comment.