Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch ros2 doctor to using psutil for network checks. #687

Merged
merged 1 commit into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ros2doctor/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

<exec_depend>ament_index_python</exec_depend>
<exec_depend>python3-catkin-pkg-modules</exec_depend>
<exec_depend>python3-ifcfg</exec_depend>
<exec_depend>python3-psutil</exec_depend>
<exec_depend>python3-importlib-metadata</exec_depend>
<exec_depend>python3-rosdistro-modules</exec_depend>
<exec_depend>rclpy</exec_depend>
Expand Down
179 changes: 150 additions & 29 deletions ros2doctor/ros2doctor/api/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
# limitations under the License.

import os
from typing import Tuple
import socket
import struct
import sys

import ifcfg
import psutil

from ros2doctor.api import DoctorCheck
from ros2doctor.api import DoctorReport
Expand All @@ -25,25 +27,106 @@
from ros2doctor.api.format import doctor_warn


def _is_unix_like_platform() -> bool:
"""Return True if conforms to UNIX/POSIX-style APIs."""
return os.name == 'posix'


def _check_network_config_helper(ifcfg_ifaces: dict) -> Tuple[bool, bool, bool]:
"""Check if loopback and multicast IP addresses are found."""
has_loopback, has_non_loopback, has_multicast = False, False, False
for iface in ifcfg_ifaces.values():
flags = iface.get('flags')
if flags:
flags = flags.lower()
if 'loopback' in flags:
has_loopback = True
else:
has_non_loopback = True
if 'multicast' in flags:
has_multicast = True
return has_loopback, has_non_loopback, has_multicast
class InterfaceFlags:
POSIX_NET_FLAGS = (
('UP', 0),
('BROADCAST', 1),
('DEBUG', 2),
('LOOPBACK', 3),

('PTP', 4),
('NOTRAILERS', 5),
('RUNNING', 6),
('NOARP', 7),

('PROMISC', 8),
('ALLMULTI', 9),
('MASTER', 10),
('SLAVE', 11),

('MULTICAST', 12),
('PORTSEL', 13),
('AUTOMEDIA', 14),
('DYNAMIC', 15),
)

def __init__(self, interface_name):
self.flags = -1
self.flag_list = set()
self.has_loopback = False
self.has_non_loopback = False
self.has_multicast = False
self.get(interface_name)

def get(self, interface_name):
if os.name != 'posix':
return

import fcntl

if sys.platform == 'darwin':
SIOCGIFFLAGS = 0xc0206911
else:
SIOCGIFFLAGS = 0x8913

# We need to pass a 'struct ifreq' to the SIOCGIFFLAGS ioctl. Nominally the
# structure looks like:
#
# struct ifreq {
# char ifr_name[IFNAMSIZ]; /* Interface name */
# union {
# struct sockaddr ifr_addr;
# struct sockaddr ifr_dstaddr;
# struct sockaddr ifr_broadaddr;
# struct sockaddr ifr_netmask;
# struct sockaddr ifr_hwaddr;
# short ifr_flags;
# int ifr_ifindex;
# int ifr_metric;
# int ifr_mtu;
# struct ifmap ifr_map;
# char ifr_slave[IFNAMSIZ];
# char ifr_newname[IFNAMSIZ];
# char *ifr_data;
# };
# };
#
# Where IFNAMSIZ is 16 bytes long. The caller (that's us) sets the ifr_name
# to the interface in question, and the call fills in the union. In the case
# of this particular ioctl, only the 'ifr_flags' is valid after the call.
# Either way, we need to provide enough room in the provided structure for the
# ioctl to fill it out, so we just provide an additional 256 bytes which should
# be more than enough.

with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
try:
result = fcntl.ioctl(s.fileno(), SIOCGIFFLAGS, interface_name + '\0' * 256)
except OSError:
return

# On success, result contains the structure filled in with the info we asked
# for (the flags). We just need to extract it from the correct place.

self.flags = struct.unpack('H', result[16:18])[0]
for flagname, bit in self.POSIX_NET_FLAGS:
if self.flags & (1 << bit):
self.flag_list.add(flagname)

if 'LOOPBACK' in self.flag_list:
self.has_loopback = True
else:
self.has_non_loopback = True

if 'MULTICAST' in self.flag_list:
self.has_multicast = True

def __str__(self):
if self.flags == -1:
return ''

output_flags = ','.join(self.flag_list)

return f'{self.flags}<{output_flags}>'


class NetworkCheck(DoctorCheck):
Expand All @@ -55,11 +138,17 @@ def category(self):
def check(self):
"""Check network configuration."""
result = Result()
# check ifcfg import for windows and osx users
ifcfg_ifaces = ifcfg.interfaces()

has_loopback, has_non_loopback, has_multicast = _check_network_config_helper(ifcfg_ifaces)
if not _is_unix_like_platform():
has_loopback = False
has_non_loopback = False
has_multicast = False
for interface in psutil.net_if_addrs().keys():
flags = InterfaceFlags(interface)
has_loopback |= flags.has_loopback
has_non_loopback |= flags.has_non_loopback
has_multicast |= flags.has_multicast

if os.name != 'posix':
if not has_loopback and not has_non_loopback:
# no flags found, otherwise one of them should be True.
doctor_warn(
Expand Down Expand Up @@ -87,12 +176,44 @@ def category(self):

def report(self):
"""Print system and ROS network information."""
# check ifcfg import for windows and osx users
ifcfg_ifaces = ifcfg.interfaces()
if_stats = psutil.net_if_stats()

network_report = Report('NETWORK CONFIGURATION')
for iface in ifcfg_ifaces.values():
for k, v in iface.items():

for interface, addrs in psutil.net_if_addrs().items():
if_info = {
'inet': None,
'inet4': [],
'ether': None,
'inet6': [],
'netmask': None,
'device': interface,
'flags': None,
'mtu': None,
'broadcast': None,
nuclearsandwich marked this conversation as resolved.
Show resolved Hide resolved
}
for addr in addrs:
if addr.family == socket.AddressFamily.AF_INET:
if_info['inet'] = addr.address
if_info['inet4'].append(addr.address)
if_info['netmask'] = addr.netmask
if_info['broadcast'] = addr.broadcast
elif addr.family == socket.AddressFamily.AF_INET6:
if_info['inet6'].append(addr.address)
elif 'AF_PACKET' in socket.AddressFamily.__members__:
if addr.family == socket.AddressFamily.AF_PACKET:
# Loopback reports an all-zero MAC address, which is
# bogus and should not be reported
if addr.address != '00:00:00:00:00:00':
if_info['ether'] = addr.address

if interface in if_stats:
if_info['mtu'] = if_stats[interface].mtu

flags = InterfaceFlags(interface)
if_info['flags'] = str(flags)

for k, v in if_info.items():
if v:
network_report.add_to_report(k, v)
return network_report