diff --git a/README.md b/README.md index 2cba98d6..7844daa6 100644 --- a/README.md +++ b/README.md @@ -23,8 +23,9 @@ The currently supported DNS providers are: 7. [DNSPod](https://www.dnspod.cn/) 8. [DuckDNS](https://www.duckdns.org/) 9. [ClouDNS](https://www.cloudns.net) -10. [AWS rout353](https://aws.amazon.com/route53/) -11. [Bring your own dns provider](#bring-your-own-dns-provider) +10. [AWS route53](https://aws.amazon.com/route53/) +11. [PowerDNS](https://doc.powerdns.com/authoritative/http-api/index.html) +12. [Bring your own dns provider](#bring-your-own-dns-provider) ... Sewer can be used very easliy programmatically as a library from code. @@ -69,6 +70,9 @@ pip3 install sewer # with AWS route53 DNS Support # pip3 install sewer[route53] + +# with PowerDNS DNS Support +# pip3 install sewer[powerdns] ``` sewer(since version 0.5.0) is now python3 only. To install the (now unsupported) python2 version, run; diff --git a/setup.py b/setup.py index 4f68bdc7..ab7fdb21 100644 --- a/setup.py +++ b/setup.py @@ -29,6 +29,7 @@ "duckdns": [""], "cloudns": ["cloudns-api"], "route53": ["boto3"], + "powerdns": [""], } all_deps_of_all_dns_provider = [] @@ -111,6 +112,7 @@ "duckdns": dns_provider_deps_map["duckdns"], "cloudns": dns_provider_deps_map["cloudns"], "route53": dns_provider_deps_map["route53"], + "powerdns": dns_provider_deps_map["powerdns"], "alldns": all_deps_of_all_dns_provider, }, # If there are data files included in your packages that need to be diff --git a/sewer/__init__.py b/sewer/__init__.py index 88d49101..4cafcd2b 100644 --- a/sewer/__init__.py +++ b/sewer/__init__.py @@ -11,3 +11,4 @@ from .dns_providers import DuckDNSDns # noqa:F401 from .dns_providers import ClouDNSDns # noqa:F401 from .dns_providers import Route53Dns # noqa:F401 +from .dns_providers import PowerDNSDns diff --git a/sewer/cli.py b/sewer/cli.py index eb77ee2a..7ee6138f 100644 --- a/sewer/cli.py +++ b/sewer/cli.py @@ -306,6 +306,18 @@ def main(): except KeyError as e: logger.error("ERROR:: Please supply {0} as an environment variable.".format(str(e))) raise + elif dns_provider == "powerdns": + from . import PowerDNSDns + + try: + powerdns_api_key = os.environ["POWERDNS_API_KEY"] + powerdns_api_url = os.environ["POWERDNS_API_URL"] + + dns_class = PowerDNSDns(powerdns_api_key, powerdns_api_url) + logger.info("chosen_dns_provider. Using {0} as dns provider.".format(dns_provider)) + except KeyError as e: + logger.error("ERROR:: Please supply {0} as an environment variable.".format(str(e))) + raise else: raise ValueError("The dns provider {0} is not recognised.".format(dns_provider)) diff --git a/sewer/dns_providers/__init__.py b/sewer/dns_providers/__init__.py index e5e737fa..78352b1f 100644 --- a/sewer/dns_providers/__init__.py +++ b/sewer/dns_providers/__init__.py @@ -9,3 +9,4 @@ from .duckdns import DuckDNSDns # noqa: F401 from .cloudns import ClouDNSDns # noqa: F401 from .route53 import Route53Dns # noqa: F401 +from .powerdns import PowerDNSDns diff --git a/sewer/dns_providers/powerdns.py b/sewer/dns_providers/powerdns.py new file mode 100644 index 00000000..ae76100a --- /dev/null +++ b/sewer/dns_providers/powerdns.py @@ -0,0 +1,97 @@ +import json +import requests + +from . import common + + +class PowerDNSDns(common.BaseDns): + """ + For PowerDNS, all subdomains for a given domain live under the apex zone `zone_id`. + For example, if you want a cert for domain.tld and www.domain.tld, you need + to create two DNS records: + 1) `acme-challenge.domain.tld. IN TXT` + 2) `acme-challenge.www.domain.tld. IN TXT` + + However, both of these records must be created under `/servers/{server_id}/zones/{zone_id}`, + where `zone_id` is the apex domain (`domain.tld`) + + So, we must be smart about stripping out subdomains as part of the URL passed + to `requests.patch`, but must maintain the FQDN in the `name` field of the `payload`. + """ + + dns_provider_name = "powerdns" + + def __init__(self, powerdns_api_key, powerdns_api_url): + self.powerdns_api_key = powerdns_api_key + self.powerdns_api_url = powerdns_api_url + super(PowerDNSDns, self).__init__() + + def validate_powerdns_zone(self, domain_name): + """ + Walk `domain_name` backwards, trying to find the apex domain. + E.g.: For `fu.bar.baz.domain.com`, `response.status_code` will only be + `200` for `domain.com` + """ + d = "." + count = domain_name.count(d) + + while True: + url = self.powerdns_api_url + "/" + domain_name + response = requests.get(url, headers={"X-API-Key": self.powerdns_api_key}) + + if response.status_code == 200: + return domain_name + elif count <= 0: + raise ValueError( + f"Could not determine apex domain: (count: {count}, domain_name: {domain_name})" + ) + else: + split = domain_name.split(d) + split.pop(0) + domain_name = d.join(split) + count -= 1 + + def _common_dns_record(self, domain_name, domain_dns_value, changetype): + if changetype not in ("REPLACE", "DELETE"): + raise ValueError("changetype is not valid.") + + payload = { + "rrsets": [ + { + "name": "_acme-challenge" + "." + domain_name + ".", + "type": "TXT", + "ttl": 60, + "changetype": changetype, + "records": [{"content": f'"{domain_dns_value}"', "disabled": False}], + } + ] + } + self.logger.debug(f"PowerDNS domain name: {domain_name}") + self.logger.debug(f"PowerDNS payload: {payload}") + + apex_domain = self.validate_powerdns_zone(domain_name) + url = self.powerdns_api_url + "/" + apex_domain + self.logger.debug(f"apex_domain: {apex_domain}") + self.logger.debug(f"url: {url}") + + try: + response = requests.patch( + url, data=json.dumps(payload), headers={"X-API-Key": self.powerdns_api_key} + ) + self.logger.debug(f"PowerDNS response: {response.status_code}, {response.text}") + except requests.exceptions.RequestException as e: + self.logger.error(f"Unable to communicate with PowerDNS API: {e}") + raise + + # Per https://doc.powerdns.com/authoritative/http-api/zone.html: + # PATCH /servers/{server_id}/zones/{zone_id} + # Creates/modifies/deletes RRsets present in the payload and their comments. + # Returns 204 No Content on success. + if response.status_code != 204: + raise ValueError(f"Error creating or deleting PowerDNS record: {response.text}") + + def create_dns_record(self, domain_name, domain_dns_value): + self._common_dns_record(domain_name, domain_dns_value, "REPLACE") + + def delete_dns_record(self, domain_name, domain_dns_value): + self._common_dns_record(domain_name, domain_dns_value, "DELETE") diff --git a/sewer/dns_providers/tests/test_powerdns.py b/sewer/dns_providers/tests/test_powerdns.py new file mode 100644 index 00000000..55703f67 --- /dev/null +++ b/sewer/dns_providers/tests/test_powerdns.py @@ -0,0 +1,101 @@ +import mock +from unittest import TestCase +import sewer + +from . import test_utils + + +class TestPowerDNS(TestCase): + """ + Tests for PowerDNS DNS provider class. + """ + + def setUp(self): + self.domain_name = "example.com" + self.domain_dns_value = "mock-domain_dns_value" + self.powerdns_api_key = "mock-api-key" + self.powerdns_api_url = "https://some-mock-url.com" + + self.common_response = test_utils.MockResponse(status_code=204) + self.apex_response = test_utils.MockResponse(status_code=200) + with mock.patch("requests.patch") as mock_requests_get: + mock_requests_get.return_value = self.common_response + self.dns_class = sewer.PowerDNSDns( + powerdns_api_key=self.powerdns_api_key, powerdns_api_url=self.powerdns_api_url + ) + + def tearDown(self): + pass + + def test_validate_powerdns_zone(self): + fqdn = f"fu.bar.baz.{self.domain_name}" + + with mock.patch("requests.get") as mock_requests_get, mock.patch( + "sewer.PowerDNSDns.validate_powerdns_zone" + ) as mock_validate_powerdns_zone: + + mock_requests_get.return_value.status_code = self.apex_response + mock_validate_powerdns_zone.return_value = self.domain_name + + response = self.dns_class.validate_powerdns_zone(fqdn) + + self.assertEqual(response, self.domain_name) + mock_validate_powerdns_zone.assert_called_with(fqdn) + + def test_could_not_determine_apex_domain(self): + with mock.patch("requests.get") as mock_requests_get: + mock_requests_get.return_value.status_code = 666 + + self.assertRaises( + ValueError, self.dns_class.validate_powerdns_zone, domain_name=self.domain_name + ) + + def test_powerdns_has_correct_changetype(self): + self.assertRaises( + ValueError, + self.dns_class._common_dns_record, + domain_name=self.domain_name, + domain_dns_value=self.domain_dns_value, + changetype="fubar", + ) + + def test_powerdns_returns_correct_status_code(self): + with mock.patch("requests.patch") as mock_requests_patch, mock.patch( + "requests.get" + ) as mock_requests_get: + + mock_requests_get.return_value = self.apex_response + mock_requests_patch.return_value.status_code = 666 + + self.assertRaises( + ValueError, + self.dns_class.create_dns_record, + domain_name=self.domain_name, + domain_dns_value=self.domain_dns_value, + ) + + def test_powerdns_is_called_by_create_dns_record(self): + with mock.patch("requests.patch") as mock_requests_patch, mock.patch( + "sewer.PowerDNSDns.delete_dns_record" + ) as mock_delete_dns_record, mock.patch("requests.get") as mock_requests_get: + + mock_requests_get.return_value = self.apex_response + mock_requests_patch.return_value = ( + mock_delete_dns_record.return_value + ) = self.common_response + + self.dns_class.create_dns_record( + domain_name=self.domain_name, domain_dns_value=self.domain_dns_value + ) + + def test_powerdns_is_called_by_delete_dns_record(self): + with mock.patch("requests.patch") as mock_requests_patch, mock.patch( + "requests.get" + ) as mock_requests_get: + + mock_requests_get.return_value = self.apex_response + mock_requests_patch.return_value = self.common_response + self.dns_class.delete_dns_record( + domain_name=self.domain_name, domain_dns_value=self.domain_dns_value + ) + self.assertTrue(mock_requests_patch.called)