From 8b30cad977d44705d492ad0c29eff2bcb0255dcd Mon Sep 17 00:00:00 2001 From: Rob Crittenden Date: Fri, 27 Oct 2023 10:50:21 -0400 Subject: [PATCH] Support validating LWCA certmonger requests The LWCA ids are UUID4 format and are stored in LDAP so we can retrieve the list (ignoring the ipa entry) and construct what the request should look like. Add a cache for the get_expected_requests() function. The certificates aren't going to change (or shouldn't) in the middle of a run and there is no point in duplicating several LDAP requests for each call. Fixes: https://github.com/freeipa/freeipa-healthcheck/issues/307 Signed-off-by: Rob Crittenden --- src/ipahealthcheck/ipa/certs.py | 65 +++++++++++++++++++++++++++++---- 1 file changed, 58 insertions(+), 7 deletions(-) diff --git a/src/ipahealthcheck/ipa/certs.py b/src/ipahealthcheck/ipa/certs.py index 76c4cc51..bc056d7c 100644 --- a/src/ipahealthcheck/ipa/certs.py +++ b/src/ipahealthcheck/ipa/certs.py @@ -35,6 +35,8 @@ logger = logging.getLogger() DAY = 60 * 60 * 24 +expected_requests = [] # to cache the expected certmonger requests + def is_ipa_issued_cert(myapi, cert): """Thin wrapper around certs.is_ipa_issued to test for LDAP""" @@ -44,7 +46,7 @@ def is_ipa_issued_cert(myapi, cert): return certs.is_ipa_issued_cert(myapi, cert) -def get_expected_requests(ca, ds, serverid): +def get_expected_requests(ca, ds, serverid, conn): """Provide the expected certmonger tracking request data This list is based in part on certificate_renewal_update() in @@ -54,10 +56,15 @@ def get_expected_requests(ca, ds, serverid): The list is filtered depending on whether a CA is running and the certificates have been issued by IPA. - :param ca: the CAInstance - :param ds: the DSInstance - :param serverid: the DS serverid name + :param ca: the CAInstance + :param ds: the DSInstance + :param serverid: the DS serverid name """ + global expected_requests + + if expected_requests: + return expected_requests + template = paths.CERTMONGER_COMMAND_TEMPLATE if api.Command.ca_is_enabled()['result']: @@ -100,6 +107,7 @@ def get_expected_requests(ca, ds, serverid): if token and token != 'internal': req['key-token'] = token requests.append(req) + requests.extend(get_lwca_requests(conn)) else: logger.debug('CA is not configured, skipping CA tracking') @@ -174,9 +182,49 @@ def get_expected_requests(ca, ds, serverid): } ) + expected_requests = requests + return requests +def get_lwca_requests(conn): + """ + Retrieve the known LWCA certificates. + """ + lwca_requests = [] + try: + entries, truncated = conn.find_entries( + base_dn=DN(api.env.container_ca, api.env.basedn), + filter="(&(objectclass=ipaca)(!(cn=ipa)))", + ) + except (errors.EmptyResult, errors.NotFound) as e: + logger.warn("Search for LWCA entries failed: %s", e) + return lwca_requests + + template = paths.CERTMONGER_COMMAND_TEMPLATE + for entry in entries: + try: + id = entry.get('ipacaid')[0] + except Exception as e: + logger.debug("Malformed LDAP entry, unable to get LWCA id %s: %s", + entry.get('dn'), e) + continue + nickname = f'caSigningCert cert-pki-ca {id}' + + req = { + 'cert-database': paths.PKI_TOMCAT_ALIAS_DIR, + 'cert-nickname': nickname, + 'ca-name': RENEWAL_CA_NAME, + 'cert-presave-command': template % 'stop_pkicad', + 'cert-postsave-command': + (template % 'renew_ca_cert "{}"'.format(nickname)), + 'template-profile': 'caCACert', + } + lwca_requests.append(req) + + return lwca_requests + + def expected_token(token_name, certmonger_token): """The value is stored in two places, do some sanity checking""" if token_name != certmonger_token: @@ -426,7 +474,8 @@ class IPACertTracking(IPAPlugin): @duration def check(self): - requests = get_expected_requests(self.ca, self.ds, self.serverid) + requests = get_expected_requests(self.ca, self.ds, self.serverid, + self.conn) cm = certmonger._certmonger() ids = [] @@ -495,7 +544,8 @@ class IPACertDNSSAN(IPAPlugin): @duration def check(self): fqdn = socket.getfqdn() - requests = get_expected_requests(self.ca, self.ds, self.serverid) + requests = get_expected_requests(self.ca, self.ds, self.serverid, + self.conn) for request in requests: request_id = certmonger.get_request_id(request) @@ -1225,7 +1275,8 @@ def check(self): if not self.ca.is_configured(): logger.debug('CA is not configured, skipping revocation check') return - requests = get_expected_requests(self.ca, self.ds, self.serverid) + requests = get_expected_requests(self.ca, self.ds, self.serverid, + self.conn) pwd_file = get_token_password_file(self.ca.hsm_enabled, self.ca.token_name) for request in requests: