Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor code to use Python3 types and added Tox linting and tests #38

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 25 additions & 0 deletions .github/workflows/python-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: Tox tests

on:
- push
- pull_request

jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11.0-alpha - 3.11"]

steps:
- uses: actions/checkout@v1
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install tox tox-gh-actions
- name: Test with tox
run: tox
139 changes: 68 additions & 71 deletions multiping/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import struct
import time
import errno
from typing import List, Optional, Tuple, Dict, Union

# Packet header operations in Python are most easiest done by using the
# struct package and packing values according to specific formats. For
Expand All @@ -31,31 +32,30 @@
_ICMP_HDR_PACK_FORMAT = "!BBHHH"

# Some offsets we use when extracting data from the header
_ICMP_HDR_OFFSET = 20
_ICMP_ID_OFFSET = _ICMP_HDR_OFFSET + 4
_ICMP_IDENT_OFFSET = _ICMP_HDR_OFFSET + 6
_ICMP_PAYLOAD_OFFSET = _ICMP_HDR_OFFSET + 8
_ICMP_ECHO_REQUEST = 8
_ICMP_ECHO_REPLY = 0

_ICMPV6_HDR_OFFSET = 0
_ICMPV6_ID_OFFSET = _ICMPV6_HDR_OFFSET + 4
_ICMPV6_IDENT_OFFSET = _ICMPV6_HDR_OFFSET + 6
_ICMP_HDR_OFFSET = 20
_ICMP_ID_OFFSET = _ICMP_HDR_OFFSET + 4
_ICMP_IDENT_OFFSET = _ICMP_HDR_OFFSET + 6
_ICMP_PAYLOAD_OFFSET = _ICMP_HDR_OFFSET + 8
_ICMP_ECHO_REQUEST = 8
_ICMP_ECHO_REPLY = 0

_ICMPV6_HDR_OFFSET = 0
_ICMPV6_ID_OFFSET = _ICMPV6_HDR_OFFSET + 4
_ICMPV6_IDENT_OFFSET = _ICMPV6_HDR_OFFSET + 6
_ICMPV6_PAYLOAD_OFFSET = _ICMPV6_HDR_OFFSET + 8
_ICMPV6_ECHO_REQUEST = 128
_ICMPV6_ECHO_REPLY = 129
_ICMPV6_ECHO_REQUEST = 128
_ICMPV6_ECHO_REPLY = 129

_IPPROTO_ICMPV6 = (socket.IPPROTO_ICMPV6
if hasattr(socket, 'IPPROTO_ICMPV6')
else 58)
_IPPROTO_ICMPV6 = (socket.IPPROTO_ICMPV6
if hasattr(socket, 'IPPROTO_ICMPV6')
else 58)


class MultiPingError(Exception):
"""
Exception class for the multiping package.

"""
pass


class MultiPingSocketError(socket.gaierror):
Expand All @@ -66,12 +66,13 @@ class MultiPingSocketError(socket.gaierror):
change already existing try-except blocks that look for socket.gaierror.

"""
pass


class MultiPing(object):
class MultiPing:
_sock: socket.socket
_sock6: Optional[socket.socket] = None

def __init__(self, dest_addrs, sock=None, ignore_lookup_errors=False):
def __init__(self, dest_addrs: List[str], sock: Optional[socket.socket] = None, ignore_lookup_errors: bool = False):
"""
Initialize a new multi ping object. This takes the configuration
consisting of the list of destination addresses and an optional socket
Expand Down Expand Up @@ -100,7 +101,7 @@ def __init__(self, dest_addrs, sock=None, ignore_lookup_errors=False):
# specification of the ping targets by name, so a name lookup needs to
# be performed. If we get a mixture of IPv4 and IPv6 answers then we
# will prefer the IPv4 addresses.
self._dest_addrs = []
self._dest_addrs = []
self._unprocessed_targets = []
for d in dest_addrs:
try:
Expand All @@ -116,13 +117,13 @@ def __init__(self, dest_addrs, sock=None, ignore_lookup_errors=False):
# We found the first IPv4 address! Use this result
addr = res[4][0]
break
elif not addr:
if not addr:
# Otherwise, we record the first of the IPv6 addresses
addr = res[4][0]
# Continue the loop, since we maybe only have had IPv6
# addresses so far and some IPv4 ones are still to come.

except socket.gaierror:
except socket.gaierror as exc:
if self._ignore_lookup_errors:
# Silently ignore name lookup errors. We can't do anything
# for those hosts. They will be collected in a list of
Expand All @@ -135,7 +136,7 @@ def __init__(self, dest_addrs, sock=None, ignore_lookup_errors=False):
# error that we received. This exception class has
# socket.gaierror as base class, so try-except blocks that
# are looking for socket.gaierror will still work.
raise MultiPingSocketError("Cannot lookup '%s'" % d)
raise MultiPingSocketError(f"Cannot lookup '{d}'") from exc

if addr:
self._dest_addrs.append(addr)
Expand All @@ -144,62 +145,59 @@ def __init__(self, dest_addrs, sock=None, ignore_lookup_errors=False):
# process those.
self._unprocessed_targets.append(d)

self._id_to_addr = {}
self._remaining_ids = None
self._last_used_id = None
self._id_to_addr: Dict[int, str] = {}
self._remaining_ids: Optional[List[int]] = None
self._last_used_id: Optional[int] = None
self._time_stamp_size = struct.calcsize("d")

self._receive_has_been_called = False
self._ipv6_address_present = False
self._ipv6_address_present = False

# use pid as identifier to filter receive pack from different
# process echo
self.ident = os.getpid() & 0xffff

# Open an ICMP socket, if we weren't provided with one already
if sock:
self._sock = sock
self._sock = sock
self._sock6 = None
else:
self._open_ipv4_icmp_socket()
self._open_ipv6_icmp_socket()

def _open_ipv4_icmp_socket(self):
def _open_ipv4_icmp_socket(self) -> None:
self._sock = self._open_icmp_socket(socket.AF_INET)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 131072)

def _open_ipv6_icmp_socket(self, ignore_failures=True):
def _open_ipv6_icmp_socket(self, ignore_failures: bool =True) -> None:
try:
self._sock6 = self._open_icmp_socket(socket.AF_INET6)
self._sock6.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 131072)
except socket.error:
except socket.error as exc:
if ignore_failures:
self._sock6 = None
return
else:
raise MultiPingSocketError("IPv6 address family not supported")
raise MultiPingSocketError("IPv6 address family not supported") from exc

@staticmethod
def _open_icmp_socket(family):
def _open_icmp_socket(family: int) -> socket.socket:
"""
Opens a socket suitable for sending/receiving ICMP echo
requests/responses.

"""
try:
proto = socket.IPPROTO_ICMP if family == socket.AF_INET \
else _IPPROTO_ICMPV6
else _IPPROTO_ICMPV6

return socket.socket(family, socket.SOCK_RAW, proto)

except socket.error as e:
if e.errno == 1:
raise MultiPingError("Root privileges required for sending "
"ICMP")
raise MultiPingError("Root privileges required for sending ICMP") from e
# Re-raise any other error
raise

def _checksum(self, msg):
def _checksum(self, msg: bytearray) -> int:
"""
Calculate the checksum of a packet.

Expand All @@ -209,7 +207,8 @@ def _checksum(self, msg):
Thank you to StackOverflow user Jason Orendorff.

"""
def carry_around_add(a, b):

def carry_around_add(a: int, b: int) -> int:
c = a + b
return (c & 0xffff) + (c >> 16)

Expand All @@ -221,7 +220,7 @@ def carry_around_add(a, b):

return s

def _send_ping(self, dest_addr, payload):
def _send_ping(self, dest_addr: str, payload: bytes) -> None:
"""
Send a single ICMPecho (ping) packet to the specified address.

Expand All @@ -246,9 +245,9 @@ def _send_ping(self, dest_addr, payload):
# - packet id (unsigned short)
# - sequence = 0 (unsigned short) This doesn't have to be 0.
dummy_header = bytearray(
struct.pack(_ICMP_HDR_PACK_FORMAT,
icmp_echo_request, 0, 0,
pkt_id, self.ident))
struct.pack(_ICMP_HDR_PACK_FORMAT,
icmp_echo_request, 0, 0,
pkt_id, self.ident))

# Calculate the checksum over the combined dummy header and payload
checksum = self._checksum(dummy_header + payload)
Expand All @@ -257,9 +256,9 @@ def _send_ping(self, dest_addr, payload):
# checksum. Need to make sure to convert checksum to network byte
# order.
real_header = bytearray(
struct.pack(_ICMP_HDR_PACK_FORMAT,
icmp_echo_request, 0, checksum,
pkt_id, self.ident))
struct.pack(_ICMP_HDR_PACK_FORMAT,
icmp_echo_request, 0, checksum,
pkt_id, self.ident))

# Full packet consists of header plus payload
full_pkt = real_header + payload
Expand All @@ -269,18 +268,18 @@ def _send_ping(self, dest_addr, payload):
# for that.
full_dest_addr = (dest_addr, 0)

if is_ipv6:
if is_ipv6 and self._sock6:
socket.inet_pton(socket.AF_INET6, dest_addr)
try:
self._sock6.sendto(full_pkt, full_dest_addr)
except Exception:
except OSError:
# on systems without IPv6 connectivity, sendto will fail with
# 'No route to host'
pass
else:
self._sock.sendto(full_pkt, full_dest_addr)

def send(self):
def send(self) -> None:
"""
Send pings to multiple addresses, ensuring unique IDs for each request.

Expand All @@ -295,8 +294,7 @@ def send(self):
if not self._receive_has_been_called:
all_addrs = self._dest_addrs
else:
all_addrs = [a for (i, a) in list(self._id_to_addr.items())
if i in self._remaining_ids]
all_addrs = [a for (i, a) in list(self._id_to_addr.items()) if i in self._remaining_ids] if self._remaining_ids else []

if self._last_used_id is None:
# Will attempt to continue at the last request ID we used. But if
Expand All @@ -317,7 +315,7 @@ def send(self):
# response and allows us to calculate the 'ping time'.
self._send_ping(addr, payload=struct.pack("d", time.time()))

def _read_all_from_socket(self, timeout):
def _read_all_from_socket(self, timeout: float) -> List[Tuple[bytearray, float]]:
"""
Read all packets we currently can on the socket.

Expand Down Expand Up @@ -364,7 +362,7 @@ def _read_all_from_socket(self, timeout):
# re-raise in that case.
raise

if self._ipv6_address_present:
if self._ipv6_address_present and self._sock6:
try:
self._sock6.settimeout(timeout)
while True:
Expand All @@ -381,7 +379,7 @@ def _read_all_from_socket(self, timeout):

return pkts

def receive(self, timeout):
def receive(self, timeout: float) -> Tuple[Dict[str, int], List[str]]:
"""
Receive ping responses from the socket. Attempts to read responses for
all stored IDs (as generated by send()).
Expand Down Expand Up @@ -409,7 +407,7 @@ def receive(self, timeout):
self._remaining_ids = list(self._id_to_addr.keys())

remaining_time = timeout
results = {}
results = {}

# Keep looping until we either have responses for all request IDs, or
# no more time is left.
Expand All @@ -426,21 +424,21 @@ def receive(self, timeout):
if pkt[_ICMPV6_HDR_OFFSET] == _ICMPV6_ECHO_REPLY:

pkt_id = (pkt[_ICMPV6_ID_OFFSET] << 8) + \
pkt[_ICMPV6_ID_OFFSET + 1]
pkt[_ICMPV6_ID_OFFSET + 1]
pkt_ident = (pkt[_ICMPV6_IDENT_OFFSET] << 8) + \
pkt[_ICMPV6_IDENT_OFFSET + 1]
pkt[_ICMPV6_IDENT_OFFSET + 1]
payload = pkt[_ICMPV6_PAYLOAD_OFFSET:]

elif pkt[_ICMP_HDR_OFFSET] == _ICMP_ECHO_REPLY:

pkt_id = (pkt[_ICMP_ID_OFFSET] << 8) + \
pkt[_ICMP_ID_OFFSET + 1]
pkt[_ICMP_ID_OFFSET + 1]
pkt_ident = (pkt[_ICMP_IDENT_OFFSET] << 8) + \
pkt[_ICMP_IDENT_OFFSET + 1]
pkt[_ICMP_IDENT_OFFSET + 1]
payload = pkt[_ICMP_PAYLOAD_OFFSET:]

if pkt_ident == self.ident and \
pkt_id in self._remaining_ids:
pkt_id in self._remaining_ids:
# The sending timestamp was encoded in the echo request
# body and is now returned to us in the response. Note
# that network byte order doesn't matter here, since we
Expand All @@ -466,18 +464,19 @@ def receive(self, timeout):
# just be added to the no-results return list. Without the flag
# those addresses would have caused an exception earlier.
no_results_so_far.extend(self._unprocessed_targets)
return (results, no_results_so_far)
return results, no_results_so_far

def __del__(self):
def __del__(self) -> None:
"""
Close sockets descriptors.
Close sockets descriptors.
"""
self._sock.close()

if self._sock6:
self._sock6.close()


def multi_ping(dest_addrs, timeout, retry=0, ignore_lookup_errors=False):
def multi_ping(dest_addrs: List[str], timeout: Union[int, float], retry: int = 0, ignore_lookup_errors: bool = False) -> Tuple[Dict[str, int], List[str]]:
"""
Combine send and receive measurement into single function.

Expand All @@ -496,15 +495,13 @@ def multi_ping(dest_addrs, timeout, retry=0, ignore_lookup_errors=False):
Those targets simply appear in the 'no_results' return list.

"""
retry = int(retry)
if retry < 0:
retry = 0

timeout = float(timeout)
retry = max(int(retry), 0)
if isinstance(timeout, int):
timeout = float(timeout)
if timeout < 0.1:
raise MultiPingError("Timeout < 0.1 seconds not allowed")

retry_timeout = float(timeout) / (retry + 1)
retry_timeout = timeout / (retry + 1)
if retry_timeout < 0.1:
raise MultiPingError("Time between ping retries < 0.1 seconds")

Expand Down