Skip to content

Commit

Permalink
fix(activity): detect ipv4 mapped ipv6 connections
Browse files Browse the repository at this point in the history
In the ActiveConnection check, properly handle the case where a service
listening on an IPv4 address is targeted by an IPv6 connection that
uses the special IPv6 range for mapping towards IPv4 addresses. This is
realized by transforming all IPv4 addresses into the IPv6 space.
Hopefully, this catches all corner-cases.

Fixes: #116
  • Loading branch information
languitar committed Oct 23, 2021
1 parent f3db876 commit a81e456
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
18 changes: 16 additions & 2 deletions src/autosuspend/checks/activity.py
Expand Up @@ -85,10 +85,23 @@ def __init__(self, name: str, ports: Iterable[int]) -> None:
Activity.__init__(self, name)
self._ports = ports

def normalize_address(
self, family: socket.AddressFamily, address: str
) -> Tuple[socket.AddressFamily, str]:
if family == socket.AF_INET6:
# strip scope
return family, address.split("%")[0]
elif family == socket.AF_INET:
# convert to IPv6 to handle cases where an IPv4 address is targeted via IPv6
# to IPv4 mapping
return socket.AF_INET6, f"::ffff:{address}"
else:
return family, address

def check(self) -> Optional[str]:
# Find the addresses of the system
own_addresses = [
(item.family, item.address.split("%")[0])
self.normalize_address(item.family, item.address)
for sublist in psutil.net_if_addrs().values()
for item in sublist
]
Expand All @@ -97,7 +110,8 @@ def check(self) -> Optional[str]:
connection.laddr[1]
for connection in psutil.net_connections()
if (
(connection.family, connection.laddr[0]) in own_addresses
self.normalize_address(connection.family, connection.laddr[0])
in own_addresses
and connection.status == "ESTABLISHED"
and connection.laddr[1] in self._ports
)
Expand Down
11 changes: 11 additions & 0 deletions tests/test_checks_activity.py
Expand Up @@ -345,6 +345,17 @@ def test_smoke(self) -> None:
"ESTABLISHED",
None,
),
# ipv6 with mapping to ipv4
# https://github.com/languitar/autosuspend/issues/116
psutil._common.sconn(
-1,
socket.AF_INET6,
socket.SOCK_STREAM,
(f"::ffff:{MY_ADDRESS}", MY_PORT),
("42.42.42.42", 42),
"ESTABLISHED",
None,
),
],
)
def test_connected(
Expand Down

0 comments on commit a81e456

Please sign in to comment.