Find file
Fetching contributors…
Cannot retrieve contributors at this time
executable file 440 lines (368 sloc) 17.5 KB
#!/usr/bin/env python
## DDNS (Dynamic DNS) command line client / updater
## Easy to use command line utility for creating
## and updating forward and reverse DNS entries
## in dynamically updatable domains.
## Supports zones on different servers, supports
## different keys for each zone, automatically
## creates reverse record and removes obsoleted
## ones.
## Author: Michal Ludvig <>
## License: GPL version 2
import os
import sys
import re
import socket
import argparse
import dns.query
import dns.tsigkeyring
import dns.update
import dns.reversename
import dns.resolver
# Normalise config settings
ZONE_SERVER = dict([(k.lower().strip('.'), v) for (k, v) in ZONE_SERVER.iteritems()])
ZONE_KEY = dict([(k.lower().strip('.'), v) for (k, v) in ZONE_KEY.iteritems()])
SERVER_KEY = dict([(k.lower().strip('.'), v) for (k, v) in SERVER_KEY.iteritems()])
keyring = {}
class DDnsError(Exception):
class KeyNotFoundInFile(DDnsError):
def __init__(self, key_file, key_name):
self.key_file = key_file
self.key_name = key_name
def __str__(self):
return "%s: %s" % (self.key_file, self.key_name)
class KeyNotFound(DDnsError):
def __init__(self, zone, server): = zone
self.server = server
def __str__(self):
return "zone: %s, server: %s" % (, self.server)
class DDnsServerNotFound(DDnsError):
def __str__(self):
return "Server not found for zone: %s" % (self.message)
class ParameterError(DDnsError):
def debug(message):
print("DEBUG: %s" % message)
def info(message):
print("INFO: %s" % message)
def warning(message):
print("WARNING: %s" % message)
def error(message):
print("ERROR: %s" % message)
def fixup_key_algo(key_algo):
return key_algo == "hmac-md5" and "HMAC-MD5.SIG-ALG.REG.INT" or key_algo
def parse_key_file(key_file, key_name):
f = open(key_file, "r").read()
pat = re.compile('key\s+[\'"]?(\S+?)[\'"]?\s*{.*?algorithm\s+[\'"]?([^\'";]+?)[\'"]?\s*;.*?secret\s+[\'"]?([^\'";]+?)[\'"]?\s*;.*?};', re.DOTALL | re.MULTILINE)
for key in pat.finditer(f):
key = list(key.groups())
key[1] = fixup_key_algo(key[1])
if key[0] == key_name:
return key
raise KeyNotFoundInFile(key_file, key_name)
except KeyNotFoundInFile, e:
except Exception, e:
sys.stderr.write("Can't parse a keyfile %s: %s\n" % (key_file, e))
def find_server(zone):
Figure out server IP or name for a given 'zone'.
Raise an exception if no suitable server is found.
if ZONE_SERVER.has_key(zone):
return ZONE_SERVER[zone]
elif ZONE_SERVER.has_key('default'):
return ZONE_SERVER['default']
raise DDnsServerNotFound(zone)
def find_key(zone, server):
find_key(zone, server) -> dns.tsigkeyring
Figure out a signing key usable for a given zone.
if ZONE_KEY.has_key(zone):
return ZONE_KEY[zone]
elif SERVER_KEY.has_key(server):
return SERVER_KEY[server]
raise KeyNotFound(zone, server)
def parse_key(key):
# key can have one of the following formats:
# - inline key: 'algorithm:name:AbCd=='
# - file with key name: '/etc/rndc.conf:ddns_key'
if os.path.isfile(key.split(":")[0]):
key_file, key_name = key.split(":", 1)
key_name, key_algo, key_value = parse_key_file(key_file, key_name)
key_algo, key_name, key_value = key.split(":", 2)
key_algo = fixup_key_algo(key_algo)
debug("Auth key: %s:%s:%s" % (key_algo, key_name, key_value))
return dns.tsigkeyring.from_text({key_name : key_value}), key_algo
class DDnsFQDN(str):
def parse_args():
def _add_common_args(_parser, rrvalue_nargs = ""):
_parser.add_argument("--config", type=file, help="Config file location")
_parser.add_argument("--verbose", action="store_true", help="Verbose operation")
_parser.add_argument("--debug", action="store_true", help="Extremely verbose operation")
_parser.add_argument("--dry-run", "-n", dest="dry_run", action="store_true", help="Do not make any changes.")
_parser.add_argument("--server", dest="server", action="store", help="Override server address")
_parser.add_argument("--auth-key", dest="authkey", action="store", help="Override authentication key (<name>:<algo>:<base64-encoded-key>)")
_parser.add_argument("--type", "-t", dest="rrtype", choices=("A", "AAAA", "NS", "CNAME", "PTR"), help="Record type, e.g. A, AAAA, PTR, etc.")
_parser.add_argument("--zone", "-z", dest="zone", type=DDnsFQDN, help="Zone to work with. By default for the zone would be, however you may instead want to put a record '' into the zone ''. In that case use --zone")
_parser.add_argument("rrname", help="Resource Record Name")
if rrvalue_nargs:
_parser.add_argument("rrvalue", nargs=rrvalue_nargs, help="Resource Record Value")
parser = argparse.ArgumentParser(prog="ddsn-cli")
subparsers = parser.add_subparsers(dest = "command")
parser_set = subparsers.add_parser("set", help="Create / update records")
parser_set_group_add_replace = parser_set.add_mutually_exclusive_group()
parser_set_group_add_replace.add_argument("--add", "-a", dest="replace", action="store_false", help="Add new record to existing ones.")
parser_set_group_add_replace.add_argument("--replace", dest="replace", action="store_true", default="true", help="Replace all existing records of the same type with the new one. (default)")
parser_set.add_argument("--ttl", dest="rrttl", type=int, default=TTL_DEFAULT, help="Time To Live")
_add_common_args(parser_set, rrvalue_nargs = "+")
#parser_get = subparsers.add_parser("get", help="Get all records for the given name")
#_add_common_args(parser_get, rrvalue_nargs = None)
parser_delete = subparsers.add_parser("delete", help="Delete some or all records for the given name")
parser_delete_group_reverse = parser_delete.add_mutually_exclusive_group()
parser_delete_group_reverse.add_argument("--delete-reverse", "--delete-ptr", dest="delete_reverse", action="store_const", const="delete", help="Remove related reverse records. For example if resolves to and if maps back to then delete as well.")
parser_delete_group_reverse.add_argument("--keep-reverse", "--keep-ptr", dest="delete_reverse", action="store_const", const="keep", help="Keep the reverse records in place.")
_add_common_args(parser_delete, rrvalue_nargs = "*")
return parser.parse_args()
class DDnsCli(object):
def __init__(self, args):
self.args = args
if args.command == "set":
return self.cmd_set()
#elif args.command == "get":
# return self.cmd_get()
elif args.command == "delete":
return self.cmd_delete()
raise ParameterError("Unknown command: %s" % args.command)
def compile_records(self):
records = []
if not self.args.rrtype:
for rrvalue in self.args.rrvalue:
rrtype = self.guess_rr_type(rrvalue)
if not rrtype:
raise ParameterError("%s: Unable to guess RR type. Please use --type=..." % rrvalue)
debug("Guessed: %s -> %s" % (rrvalue, rrtype))
records.append({"rrtype":rrtype, "rrvalue":rrvalue})
for rrvalue in self.args.rrvalue:
if self.verify_rr_type(rrvalue, self.args.rrtype):
records.append({"rrtype":self.args.rrtype, "rrvalue":rrvalue})
raise ParameterError("%s: Not a valid %s type value" % (rrvalue, self.args.rrtype))
return records
def guess_best_zone(self, rrname):
zone = rrname = str(rrname).lower().strip(".")
if rrname.count(".") < 1:
raise ParameterError("%s: Hostname is not fully qualified. Append your domain please." % rrname)
while zone:
if zone in ZONE_SERVER:
host = rrname[:-len(zone)].strip(".")
debug("guess_best_zone(%s) [found-zone]: host=%s, zone=%s" % (rrname, host, zone))
return host, zone
zone = ".".join(zone.split(".")[1:])
host, zone = rrname.split(".", 1)
debug("guess_best_zone(%s) [fallback]: host=%s, zone=%s" % (rrname, host, zone))
return host, zone
def init_ddns_args(self):
ddns_args = {}
# Explicit zone was specified
ddns_args['host'] = self.args.rrname.replace(, "")
ddns_args['zone'] =
# No explicit zone. Guess it from FQDN (rrname)
ddns_args['host'], ddns_args['zone'] = self.guess_best_zone(self.args.rrname)
ddns_args['host'] = ddns_args['host'].strip(".")
ddns_args['zone'] = ddns_args['zone'].strip(".")
if self.args.server:
ddns_args['server'] = self.args.server
ddns_args['server'] = find_server(ddns_args['zone'])
info("Server for zone '%(zone)s': %(server)s" % ddns_args)
if self.args.authkey:
ddns_args['key'] = self.args.authkey
ddns_args['key'] = find_key(ddns_args['zone'], ddns_args['server'])
except KeyNotFound, e:
# If the server addr was overriden by --server arg
# retry with server found in the config
if e.server == self.args.server:
conf_server = find_server(ddns_args['zone'])
ddns_args['key'] = find_key(ddns_args['zone'], conf_server)
ddns_args['keyring'], ddns_args['keyalgorithm'] = parse_key(ddns_args['key'])
return ddns_args
def guess_rr_type(self, rrvalue):
if self.verify_rr_type(rrvalue, "A"):
return "A"
if self.verify_rr_type(rrvalue, "AAAA"):
return "AAAA"
return None
def verify_rr_type(self, rrvalue, rrtype):
if rrtype == "A":
socket.inet_pton(socket.AF_INET, rrvalue)
# inet_pton() accepts shortened IPv4 addresses,
# e.g. "127.1" - we don't allow that. Count the
# number of '.' in the address to make sure it's
# in a quad-byte form.
return rrvalue.count('.') == 3
except socket.error:
return False
if rrtype == "AAAA":
socket.inet_pton(socket.AF_INET6, rrvalue)
return True
except socket.error:
return False
if rrtype in [ "PTR", "NS", "CNAME" ]:
return self.verify_fqdn(rrvalue)
error("Unsupported RR type: %s" % rrtype)
return False
def verify_fqdn(self, rrvalue):
return rrvalue.count(".") > 0
def cmd_set(self):
ddns_args = self.init_ddns_args()
records = self.compile_records()
debug("ddns_args: %r" % ddns_args)
update = dns.update.Update(ddns_args['zone'], keyring=ddns_args['keyring'], keyalgorithm = ddns_args['keyalgorithm'])
if self.args.replace:
op_str = "REPLACE"
op_method = update.replace
op_str = "ADD"
op_method = update.add
for record in records:
info("%s: %s.%s(%s, %s)" % (op_str, ddns_args['host'], ddns_args['zone'], record['rrvalue'], record['rrtype']))
if not self.args.dry_run:
op_method(ddns_args['host'], self.args.rrttl, record['rrtype'], record['rrvalue'])
response = dns.query.tcp(update, ddns_args['server'])
info("%s, %s" % (op_str, dns.rcode.to_text(response.rcode())))
if record['rrtype'] == 'A' or record['rrtype'] == 'AAAA':
rev_host, rev_zone = self.guess_best_zone(dns.reversename.from_address(record['rrvalue']))
rev_value = "%s.%s." % (ddns_args['host'], ddns_args['zone'])
ptr_update = dns.update.Update(rev_zone, keyring=ddns_args['keyring'], keyalgorithm = ddns_args['keyalgorithm'])
# Always replace even if we are doing an add in forward zone
ptr_update.replace(rev_host, self.args.rrttl, 'PTR', rev_value)
info("%s: %s.%s(%s, %s)" % (op_str, rev_host, rev_zone, rev_value, 'PTR'))
if not self.args.dry_run:
response = dns.query.tcp(ptr_update, ddns_args['server'])
info("%s: %s" % (op_str, dns.rcode.to_text(response.rcode())))
def cmd_delete_reverse(self, ddns_args, records):
revs = []
resolver = dns.resolver.Resolver()
resolver.nameservers = [ddns_args['server']]
rrname = self.args.rrname.strip(".")
# First delete any REV zone A or AAAA records that might exist
for rev_type in ("A", "AAAA"):
debug("resolving(%s, %s)" % (rrname, rev_type))
answers = resolver.query(rrname, rev_type)
for rdata in answers:
rev_rrtype = dns.rdatatype.to_text(rdata.rdtype)
revs.append({'type': rev_rrtype, 'value': str(rdata)})
debug("(%s, %s) -> %s" % (rrname, revs[-1]['type'], revs[-1]['value']))
if not revs:
if self.args.delete_reverse == None:
warning("Reverse records found. Switchnig to --dry-run mode.")
warning("Use --delete-reverse or --keep-reverse to perform the changes.")
self.args.dry_run = True
for rev in revs:
flag = 0
rev_verified = 0
rev_value = dns.reversename.from_address(rev['value'])
rev_host, rev_zone = self.guess_best_zone(rev_value)
rev_name = "%s.%s" % (rev_host, rev_zone)
# Verify that rev_value resolves back to rrname
debug("resolving(%s, PTR)" % (rev_value))
answers = resolver.query(rev_value, "PTR")
for rdata in answers:
rev_rev_name = str(rdata).rstrip(".")
if rev_rev_name == rrname:
debug("OK (%s, PTR)==%s" % (rev_value, rev_rev_name))
warning("Reverse-verification failed: %s(PTR)=>%s (expected: %s)" % (rev_value, rev_rev_name, rrname))
warning("Unable to resolve %s(PTR)" % (rev_value))
ptr_update = dns.update.Update(rev_zone, keyring=ddns_args['keyring'], keyalgorithm = ddns_args['keyalgorithm'])
if not records:
if not self.args.rrtype:
info("DELETE: %s(PTR)" % rev_name)
flag = 1
if rev['type'] == self.args.rrtype:
info("DELETE: %s(PTR)" % (rev_name))
ptr_update.delete(rev_name, 'PTR')
flag = 1
for record in records:
if rev['type'] == record['rrtype']:
if rev['value'] == record['rrvalue']:
info("DELETE: %s(PTR)" % (rev_name))
ptr_update.delete(rev_name, 'PTR')
flag = 1
if flag == 1:
if not self.args.dry_run:
response = dns.query.tcp(ptr_update, ddns_args['server'])
debug("REV ZONE DELETE: %s" % (dns.rcode.to_text(response.rcode())))
def cmd_delete(self):
ddns_args = self.init_ddns_args()
records = self.compile_records()
debug("ddns_args: %r" % ddns_args)
print "delete_reverse=%r" % self.args.delete_reverse
if self.args.delete_reverse in ["delete", None]:
self.cmd_delete_reverse(ddns_args, records)
# Delete FWD zone records
update = dns.update.Update(ddns_args['zone'], keyring=ddns_args['keyring'], keyalgorithm = ddns_args['keyalgorithm'])
if not records:
if not self.args.rrtype:
info("DELETE: %s.%s(ANY, ANY)" % (ddns_args['host'], ddns_args['zone']))
info("DELETE: %s.%s(%s, ANY)" % (ddns_args['host'], ddns_args['zone'], self.args.rrtype))
update.delete(ddns_args['host'], self.args.rrtype)
for record in records:
info("DELETE: %s.%s(%s, %s)" % (ddns_args['host'], ddns_args['zone'], record['rrtype'], record['rrvalue']))
update.delete(ddns_args['host'], record['rrtype'], record['rrvalue'])
if not self.args.dry_run:
response = dns.query.tcp(update, ddns_args['server'])
info("DELETE: %s" % (dns.rcode.to_text(response.rcode())))
if __name__ == "__main__":
args = parse_args()
ddns_cli = DDnsCli(args)
except DDnsError, e:
sys.stderr.write("ERROR: %s\n" % e)
# vim:expandtab:ts=4:softtabstop=4:autoindent