Skip to content

Commit

Permalink
support interface selection (#137)
Browse files Browse the repository at this point in the history
Added two new methods for `ServiceDaemon`: `enable_interface` and `disable_interface`.
  • Loading branch information
keepsimple1 committed Oct 18, 2023
1 parent 28a18a2 commit 357b258
Show file tree
Hide file tree
Showing 5 changed files with 400 additions and 47 deletions.
14 changes: 7 additions & 7 deletions src/dns_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,13 +966,6 @@ impl DnsIncoming {

// decode RDATA based on the record type.
let rec: Option<DnsRecordBox> = match ty {
TYPE_A => Some(Box::new(DnsAddress::new(
&name,
ty,
class,
ttl,
self.read_ipv4().into(),
))),
TYPE_CNAME | TYPE_PTR => Some(Box::new(DnsPointer::new(
&name,
ty,
Expand Down Expand Up @@ -1004,6 +997,13 @@ impl DnsIncoming {
self.read_char_string(),
self.read_char_string(),
))),
TYPE_A => Some(Box::new(DnsAddress::new(
&name,
ty,
class,
ttl,
self.read_ipv4().into(),
))),
TYPE_AAAA => Some(Box::new(DnsAddress::new(
&name,
ty,
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ mod service_info;

pub use error::{Error, Result};
pub use service_daemon::{
DaemonEvent, Metrics, ServiceDaemon, ServiceEvent, UnregisterStatus,
DaemonEvent, IfKind, Metrics, ServiceDaemon, ServiceEvent, UnregisterStatus,
SERVICE_NAME_LEN_MAX_DEFAULT,
};
pub use service_info::{AsIpAddrs, IntoTxtProperties, ServiceInfo, TxtProperties, TxtProperty};
Expand Down
254 changes: 219 additions & 35 deletions src/service_daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,32 @@ impl ServiceDaemon {
self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
}

/// Include interfaces that match `if_kind` for this service daemon.
///
/// For example:
/// ```ignore
/// daemon.enable_interface("en0")?;
/// ```
pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
let if_kind_vec = if_kind.into_vec();
self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
if_kind_vec.kinds,
)))
}

/// Ignore/exclude interfaces that match `if_kind` for this daemon.
///
/// For example:
/// ```ignore
/// daemon.disable_interface(IfKind::IPv6)?;
/// ```
pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
let if_kind_vec = if_kind.into_vec();
self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
if_kind_vec.kinds,
)))
}

/// The main event loop of the daemon thread
///
/// In each round, it will:
Expand Down Expand Up @@ -548,7 +574,7 @@ impl ServiceDaemon {

Command::UnregisterResend(packet, ip) => {
if let Some(intf_sock) = zc.intf_socks.get(&ip) {
debug!("Send a packet length of {}", packet.len());
debug!("UnregisterResend from {}", &ip);
broadcast_on_intf(&packet[..], intf_sock);
zc.increase_counter(Counter::UnregisterResend, 1);
}
Expand Down Expand Up @@ -599,7 +625,7 @@ impl ServiceDaemon {
}
}

/// Creates a new UDP socket that uses `intf_ip` to send and recv multicast.
/// Creates a new UDP socket that uses `intf` to send and recv multicast.
fn new_socket_bind(intf: &Interface) -> Result<Socket> {
// Use the same socket for receiving and sending multicast packets.
// Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
Expand Down Expand Up @@ -636,11 +662,10 @@ fn new_socket_bind(intf: &Interface) -> Result<Socket> {
sock.set_multicast_if_v6(intf.index.unwrap_or(0))
.map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;

// Test if we can send packets successfully.
let multicast_addr = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0).into();
let test_packet = DnsOutgoing::new(0).to_packet_data();
sock.send_to(&test_packet, &multicast_addr)
.map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
// We are not sending multicast packets to test this socket as there might
// be many IPv6 interfaces on a host and could cause such send error:
// "No buffer space available (os error 55)".

Ok(sock)
}
}
Expand Down Expand Up @@ -688,6 +713,97 @@ struct IntfSock {
sock: Socket,
}

/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
///
/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum IfKind {
/// All interfaces.
All,

/// All IPv4 interfaces.
IPv4,

/// All IPv6 interfaces.
IPv6,

/// By the interface name, for example "en0"
Name(String),

/// By an IPv4 or IPv6 address.
Addr(IpAddr),
}

impl IfKind {
/// Checks if `intf` matches with this interface kind.
fn matches(&self, intf: &Interface) -> bool {
match self {
IfKind::All => true,
IfKind::IPv4 => intf.ip().is_ipv4(),
IfKind::IPv6 => intf.ip().is_ipv6(),
IfKind::Name(ifname) => ifname == &intf.name,
IfKind::Addr(addr) => addr == &intf.ip(),
}
}
}

/// The first use case of specifying an interface was to
/// use an interface name. Hence adding this for ergonomic reasons.
impl From<&str> for IfKind {
fn from(val: &str) -> IfKind {
IfKind::Name(val.to_string())
}
}

impl From<&String> for IfKind {
fn from(val: &String) -> IfKind {
IfKind::Name(val.to_string())
}
}

/// Still for ergonomic reasons.
impl From<IpAddr> for IfKind {
fn from(val: IpAddr) -> IfKind {
IfKind::Addr(val)
}
}

/// A list of `IfKind` that can be used to match interfaces.
pub struct IfKindVec {
kinds: Vec<IfKind>,
}

/// A trait that converts a type into a Vec of `IfKind`.
pub trait IntoIfKindVec {
fn into_vec(self) -> IfKindVec;
}

impl<T: Into<IfKind>> IntoIfKindVec for T {
fn into_vec(self) -> IfKindVec {
let if_kind: IfKind = self.into();
IfKindVec {
kinds: vec![if_kind],
}
}
}

impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
fn into_vec(self) -> IfKindVec {
let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
IfKindVec { kinds }
}
}

/// Selection of interfaces.
struct IfSelection {
/// The interfaces to be selected.
if_kind: IfKind,

/// Whether the `if_kind` should be enabled or not.
selected: bool,
}

/// A struct holding the state. It was inspired by `zeroconf` package in Python.
struct Zeroconf {
/// Local interfaces with sockets to recv/send on these interfaces.
Expand Down Expand Up @@ -721,6 +837,9 @@ struct Zeroconf {
/// Options
service_name_len_max: u8,

/// All interface selections called to the daemon.
if_selections: Vec<IfSelection>,

/// Socket for signaling.
signal_sock: UdpSocket,

Expand Down Expand Up @@ -753,6 +872,7 @@ impl Zeroconf {
let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;

let timers = vec![];
let if_selections = vec![];

Ok(Self {
intf_socks,
Expand All @@ -766,6 +886,7 @@ impl Zeroconf {
poller,
monitors,
service_name_len_max,
if_selections,
signal_sock,
timers,
})
Expand All @@ -774,7 +895,31 @@ impl Zeroconf {
fn process_set_option(&mut self, daemon_opt: DaemonOption) {
match daemon_opt {
DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
}
}

fn enable_interface(&mut self, kinds: Vec<IfKind>) {
for if_kind in kinds {
self.if_selections.push(IfSelection {
if_kind,
selected: true,
});
}

self.process_if_selections();
}

fn disable_interface(&mut self, kinds: Vec<IfKind>) {
for if_kind in kinds {
self.if_selections.push(IfSelection {
if_kind,
selected: false,
});
}

self.process_if_selections();
}

fn notify_monitors(&mut self, event: DaemonEvent) {
Expand Down Expand Up @@ -826,6 +971,45 @@ impl Zeroconf {
key
}

/// Apply all selections to the available interfaces.
fn process_if_selections(&mut self) {
// By default, we enable all interfaces.
let interfaces = my_ip_interfaces();
let intf_count = interfaces.len();
let mut intf_selections = vec![true; intf_count];

// apply if_selections
for selection in self.if_selections.iter() {
// Mark the interfaces for this selection.
for i in 0..intf_count {
if selection.if_kind.matches(&interfaces[i]) {
intf_selections[i] = selection.selected;
}
}
}

// Update `intf_socks` based on the selections.
for (idx, intf) in interfaces.into_iter().enumerate() {
let ip_addr = intf.ip();

if intf_selections[idx] {
// Add the interface
if self.intf_socks.get(&ip_addr).is_none() {
self.add_new_interface(intf);
}
} else {
// Remove the interface
if let Some(if_sock) = self.intf_socks.remove(&ip_addr) {
if let Err(e) = self.poller.delete(&if_sock.sock) {
error!("process_if_selections: poller.delete {:?}: {}", &ip_addr, e);
}
// Remove from poll_ids
self.poll_ids.retain(|_, v| v != &ip_addr);
}
}
}
}

/// Check for IP changes and update intf_socks as needed.
fn check_ip_changes(&mut self) {
// Get the current interfaces.
Expand Down Expand Up @@ -862,37 +1046,36 @@ impl Zeroconf {

// Add newly found interfaces.
for intf in my_ifaddrs {
// Skip existing interfaces.
if self.intf_socks.get(&intf.ip()).is_some() {
continue;
if self.intf_socks.get(&intf.ip()).is_none() {
self.add_new_interface(intf);
}
}
}

// Bind the new interface.
let new_ip = intf.ip();
let sock = match new_socket_bind(&intf) {
Ok(s) => {
debug!("check_ip_changes: bind {}", &intf.ip());
s
}
Err(e) => {
debug!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
continue;
}
};

// Add the new interface into the poller.
let key = self.add_poll(new_ip);
if let Err(e) = self.poller.add(&sock, polling::Event::readable(key)) {
error!("check_ip_changes: poller add ip {}: {}", new_ip, e);
fn add_new_interface(&mut self, intf: Interface) {
// Bind the new interface.
let new_ip = intf.ip();
let sock = match new_socket_bind(&intf) {
Ok(s) => s,
Err(e) => {
error!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
return;
}
};

self.intf_socks.insert(new_ip, IntfSock { intf, sock });
// Add the new interface into the poller.
let key = self.add_poll(new_ip);
if let Err(e) = self.poller.add(&sock, polling::Event::readable(key)) {
error!("check_ip_changes: poller add ip {}: {}", new_ip, e);
return;
}

self.add_addr_in_my_services(new_ip);
self.intf_socks.insert(new_ip, IntfSock { intf, sock });

// Notify the monitors.
self.notify_monitors(DaemonEvent::IpAdd(new_ip));
}
self.add_addr_in_my_services(new_ip);

// Notify the monitors.
self.notify_monitors(DaemonEvent::IpAdd(new_ip));
}

/// Registers a service.
Expand Down Expand Up @@ -1205,8 +1388,7 @@ impl Zeroconf {
if let Some(records) = self.cache.ptr.get(ty_domain) {
for record in records.iter() {
if let Some(ptr) = record.any().downcast_ref::<DnsPointer>() {
let info = self.create_service_info_from_cache(ty_domain, &ptr.alias);
let info = match info {
let info = match self.create_service_info_from_cache(ty_domain, &ptr.alias) {
Ok(ok) => ok,
Err(err) => {
error!("Error while creating service info from cache: {}", err);
Expand Down Expand Up @@ -1267,7 +1449,7 @@ impl Zeroconf {
}
}

// resolve A records
// resolve A and AAAA records
if let Some(records) = self.cache.addr.get(info.get_hostname()) {
for answer in records.iter() {
if let Some(dns_a) = answer.any().downcast_ref::<DnsAddress>() {
Expand Down Expand Up @@ -1662,6 +1844,8 @@ impl fmt::Display for Command {
#[derive(Debug)]
enum DaemonOption {
ServiceNameLenMax(u8),
EnableInterface(Vec<IfKind>),
DisableInterface(Vec<IfKind>),
}

struct DnsCache {
Expand Down
Loading

0 comments on commit 357b258

Please sign in to comment.