Skip to content

Commit

Permalink
Implement WAN IP detection using DNS queries
Browse files Browse the repository at this point in the history
  • Loading branch information
infothrill committed Jan 12, 2018
1 parent 5d89b60 commit 2c09e49
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Expand Up @@ -4,6 +4,7 @@ Release history
---------------
0.5.0 (unreleased)
++++++++++++++++++
- added: WAN IP detection through DNS (detector 'dnswanip')
- improved: replaced built-in daemon code with `daemonocle <https://pypi.python.org/pypi/daemonocle>`_
- switched to `pytest <https://pytest.org>`_ for running tests
- changed (**INCOMPATIBLE**): dropped support for python 2.6 and python 3.3
Expand Down
1 change: 1 addition & 0 deletions dyndnsc/detector/builtin.py
Expand Up @@ -11,6 +11,7 @@
_BUILTINS = (
("dyndnsc.detector.command", "IPDetector_Command"),
("dyndnsc.detector.dns", "IPDetector_DNS"),
("dyndnsc.detector.dnswanip", "IPDetector_DnsWanIp"),
("dyndnsc.detector.iface", "IPDetector_Iface"),
("dyndnsc.detector.socket_ip", "IPDetector_Socket"),
("dyndnsc.detector.rand", "IPDetector_Random"),
Expand Down
89 changes: 89 additions & 0 deletions dyndnsc/detector/dnswanip.py
@@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-

"""Module containing logic for DNS WAN IP detection.
See also https://www.cyberciti.biz/faq/how-to-find-my-public-ip-address-from-command-line-on-a-linux/
"""
from __future__ import absolute_import

import socket
import logging

import dns.resolver

from .base import IPDetector, AF_INET, AF_INET6

LOG = logging.getLogger(__name__)


def find_ip(family=AF_INET, flavour="opendns"):
"""Find the publicly visible IP address of the current system.
This uses public DNS infrastructure that implement a special DNS "hack" to
return the IP address of the requester rather than some other address.
:param family: address family, optional, default AF_INET (ipv4)
:param flavour: selector for public infrastructure provider, optional
"""
flavours = {
"opendns": {
AF_INET: {
"@": ("resolver1.opendns.com", "resolver2.opendns.com"),
"qname": "myip.opendns.com",
"rdtype": "A",
},
AF_INET6: {
"@": ("resolver1.ipv6-sandbox.opendns.com", "resolver2.ipv6-sandbox.opendns.com"),
"qname": "myip.opendns.com",
"rdtype": "AAAA",
},
},
}

flavour = flavours["opendns"]
resolver = dns.resolver.Resolver()
resolver.nameservers = [socket.gethostbyname(h) for h in flavour[family]["@"]]

answers = resolver.query(qname=flavour[family]["qname"], rdtype=flavour[family]["rdtype"])
for rdata in answers:
return rdata.address
return None


class IPDetector_DnsWanIp(IPDetector):
"""Class to discover the internet visible IP address using publicly available DNS infrastructure."""

def __init__(self, family=None, *args, **kwargs):
"""
Initializer.
:param family: IP address family (default: '' (ANY), also possible: 'INET', 'INET6')
"""
if family is None:
family = AF_INET
super(IPDetector_DnsWanIp, self).__init__(*args, family=family, **kwargs)

@staticmethod
def names():
"""Return a list of string names identifying this class/service."""
return ("dnswanip",)

def can_detect_offline(self):
"""Return false, as this detector generates dns traffic.
:return: False
"""
return False

def detect(self):
"""
Detect the WAN IP of the current process through DNS.
Depending on the 'family' option, either ipv4 or ipv6 resolution is
carried out.
:return: ip address
"""
theip = find_ip(family=self.opts_family)
self.set_current_value(theip)
return theip
16 changes: 8 additions & 8 deletions dyndnsc/tests/detector/test_all.py
Expand Up @@ -7,6 +7,13 @@

from dyndnsc.detector.base import AF_INET, AF_INET6, AF_UNSPEC

HAVE_IPV6 = True
try:
import socket
socket.socket(socket.AF_INET6, socket.SOCK_DGRAM).connect(("ipv6.google.com", 0))
except (OSError, socket.error, socket.gaierror):
HAVE_IPV6 = False


class TestPluginDetectors(unittest.TestCase):
"""Test cases for detector discovery and management."""
Expand Down Expand Up @@ -80,14 +87,7 @@ def test_dns_detector(self):
self.assertEqual(AF_INET, detector.af())
self.assertTrue(detector.detect() in ("127.0.0.1", ))
# test address family restriction to ipv6:
have_ipv6 = True
import socket
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
try:
s.connect(("ipv6.google.com", 0))
except Exception:
have_ipv6 = False
if have_ipv6:
if HAVE_IPV6:
detector = ns.IPDetector_DNS(hostname="localhost", family="INET6")
self.assertEqual(AF_INET6, detector.af())
val = detector.detect()
Expand Down
58 changes: 58 additions & 0 deletions dyndnsc/tests/detector/test_dnswanip.py
@@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-

"""Tests for detectors."""


import unittest

import pytest

from dyndnsc.common.six import string_types
from dyndnsc.common.six import ipaddress
from dyndnsc.detector.base import AF_INET, AF_INET6
from dyndnsc.detector.dnswanip import IPDetector_DnsWanIp

HAVE_IPV6 = True
try:
import socket
socket.socket(socket.AF_INET6, socket.SOCK_DGRAM).connect(("ipv6.google.com", 0))
except (OSError, socket.error, socket.gaierror):
HAVE_IPV6 = False


class TestIndividualDetectors(unittest.TestCase):
"""Test cases for detectors."""

def test_dnswanip_detector_class(self):
"""Run basic tests for IPDetector_DnsWanIp."""
self.assertTrue("dnswanip" in IPDetector_DnsWanIp.names())
detector = IPDetector_DnsWanIp()
self.assertFalse(detector.can_detect_offline())
self.assertEqual(None, detector.get_current_value())
# default family should be ipv4:
detector = IPDetector_DnsWanIp(family=None)
self.assertEqual(AF_INET, detector.af())
detector = IPDetector_DnsWanIp(family=AF_INET)
self.assertEqual(AF_INET, detector.af())
detector = IPDetector_DnsWanIp(family=AF_INET6)
self.assertEqual(AF_INET6, detector.af())

def test_dnswanip_detector_ipv4(self):
"""Run ipv4 tests for IPDetector_DnsWanIp."""
detector = IPDetector_DnsWanIp(family=AF_INET)
result = detector.detect()
self.assertTrue(isinstance(result, (type(None),) + string_types), type(result))
# ensure the result is in fact an IP address:
self.assertNotEqual(ipaddress(result), None)
self.assertEqual(detector.get_current_value(), result)

@pytest.mark.skipif(not HAVE_IPV6, reason="requires ipv6 connectivity")
def test_dnswanip_detector_ipv6(self):
"""Run ipv6 tests for IPDetector_DnsWanIp."""
if HAVE_IPV6: # allow running test in IDE without pytest support
detector = IPDetector_DnsWanIp(family=AF_INET6)
result = detector.detect()
self.assertTrue(isinstance(result, (type(None),) + string_types), type(result))
# ensure the result is in fact an IP address:
self.assertNotEqual(ipaddress(result), None)
self.assertEqual(detector.get_current_value(), result)
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -90,7 +90,7 @@ def patch_install_requires(requires):
url="https://github.com/infothrill/python-dyndnsc",
setup_requires=["pytest-runner"],
install_requires=patch_install_requires(
["requests>=2.0.1", "setuptools", "netifaces>=0.10.5", "daemonocle>=1.0.1"]),
["requests>=2.0.1", "setuptools", "netifaces>=0.10.5", "daemonocle>=1.0.1", "dnspython>=1.15.0"]),
entry_points=("""
[console_scripts]
dyndnsc=dyndnsc.cli:main
Expand Down

0 comments on commit 2c09e49

Please sign in to comment.