Skip to content

Commit

Permalink
refactor(formating and typing): add more type hints [DNS-1270] (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
zak905 committed Apr 15, 2024
1 parent ecc8a7f commit 971a960
Showing 1 changed file with 19 additions and 17 deletions.
36 changes: 19 additions & 17 deletions certbot_dns_ionos/ionos.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from certbot import errors
from certbot.plugins import dns_common
from typing import Any

logger = logging.getLogger(__name__)

Expand All @@ -24,34 +25,35 @@ def __init__(self, *args, **kwargs):
self.credentials = None

@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
def add_parser_arguments(cls, add):
super(Authenticator, cls).add_parser_arguments(
add, default_propagation_seconds=120
)
add("credentials", help="credentials INI file.")

def more_info(self): # pylint: disable=missing-docstring,no-self-use
def more_info(self) -> str:
return (
"This plugin configures a DNS TXT record to respond to a dns-01"
+ " challenge using the IONOS REST API."
)

def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
"credentials",
"IONOS API credentials INI file. Only Bearer token"
+ " authentication is supported",
{"token": "access token for the IONOS API"},
)

def _perform(self, domain, validation_name, validation):
self._get_ionos_client().add_txt_record(domain, validation_name, validation)

def _cleanup(self, domain, validation_name, validation):
self._get_ionos_client().del_txt_record(domain, validation_name, validation)
def _perform(self, domain, validation_name, validation) -> None:
_IONOSClient(self.credentials.conf("token")).add_txt_record(
domain, validation_name, validation
)

def _get_ionos_client(self):
return _IONOSClient(self.credentials.conf("token"))
def _cleanup(self, domain, validation_name, validation) -> None:
_IONOSClient(self.credentials.conf("token")).del_txt_record(
domain, validation_name, validation
)


class _IONOSClient(object):
Expand All @@ -63,7 +65,7 @@ def __init__(self, token: str):
logger.debug("creating IONOS Client")
self.headers = {"Authorization": f"Bearer {token}"}

def _handle_response(self, resp: requests.Response):
def _handle_response(self, resp: requests.Response) -> Any:
if resp.status_code != 200 and resp.status_code != 202:
raise errors.PluginError(
"Received non OK status from IONOS API {0}".format(resp.status_code)
Expand All @@ -73,7 +75,7 @@ def _handle_response(self, resp: requests.Response):
except json.decoder.JSONDecodeError:
raise errors.PluginError("API response with non JSON: {0}".format(resp.text))

def add_txt_record(self, domain, record_name: str, record_content):
def add_txt_record(self, domain: str, record_name: str, record_content: str):
"""
Add a TXT record using the supplied information.
Expand Down Expand Up @@ -106,7 +108,7 @@ def add_txt_record(self, domain, record_name: str, record_content):
logger.info("insert new txt record")
self._insert_txt_record(zone_id, record_name_without_domain, record_content)

def del_txt_record(self, domain, record_name: str, record_content: str):
def del_txt_record(self, domain: str, record_name: str, record_content: str):
"""
Delete a TXT record using the supplied information.
:param str domain: The domain to use to look up the managed zone.
Expand Down Expand Up @@ -135,7 +137,7 @@ def del_txt_record(self, domain, record_name: str, record_content: str):
)
)

def _insert_txt_record(self, zone_id, record_name, record_content):
def _insert_txt_record(self, zone_id: str, record_name: str, record_content: str):
new_record = {
"properties": {
"name": record_name,
Expand All @@ -153,7 +155,7 @@ def _insert_txt_record(self, zone_id, record_name, record_content):
)
logger.debug("create with payload: %s", new_record)

def _update_txt_record(self, zone_id, record):
def _update_txt_record(self, zone_id: str, record: dict):
self._handle_response(
requests.put(
f"{dns_api_base_url}/zones/{zone_id}/records/{record.get('id')}",
Expand All @@ -163,7 +165,7 @@ def _update_txt_record(self, zone_id, record):
)
logger.debug("update with payload: %s", record)

def _find_zone_id(self, domain):
def _find_zone_id(self, domain: str) -> str | None:
"""
Find the zone for a given domain.
Expand All @@ -188,7 +190,7 @@ def _find_zone_id(self, domain):

return None

def get_existing_txt_acme_record(self, zone_id, record_name):
def get_existing_txt_acme_record(self, zone_id: str, record_name: str) -> Any | None:
"""
Get existing TXT records for the record name.
Expand Down

0 comments on commit 971a960

Please sign in to comment.