From 7e52fed258530929e126eb3cae19d7fbf794aa86 Mon Sep 17 00:00:00 2001 From: Hubert Kario Date: Wed, 25 Sep 2019 21:10:25 +0200 Subject: [PATCH] support for ECDSA client certificates --- tests/clientECCert.pem | 11 +++ tests/clientECKey.pem | 5 + tests/tlstest.py | 36 ++++++- tlslite/keyexchange.py | 2 +- tlslite/tlsconnection.py | 198 ++++++++++++++++++++++++--------------- 5 files changed, 174 insertions(+), 78 deletions(-) create mode 100644 tests/clientECCert.pem create mode 100644 tests/clientECKey.pem diff --git a/tests/clientECCert.pem b/tests/clientECCert.pem new file mode 100644 index 000000000..c24dea193 --- /dev/null +++ b/tests/clientECCert.pem @@ -0,0 +1,11 @@ +-----BEGIN CERTIFICATE----- +MIIBfjCCASOgAwIBAgIUTxkWwfl73xnqhoJqWZ+R4RPCX54wCgYIKoZIzj0EAwIw +FDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTE5MDkyNTE2NDgzMloXDTI5MDkyMjE2 +NDgzMlowFDESMBAGA1UEAwwJbG9jYWxob3N0MFkwEwYHKoZIzj0CAQYIKoZIzj0D +AQcDQgAEQNSgjtKQ2KEyIjcamNSSH83huV2aYpJMCRYAXvEBUNygmCwxCod2j97J +Z4rnBmd9ySlceT65nXFuDuKlftIlLaNTMFEwHQYDVR0OBBYEFM9448HkxS8PnjT2 +CRzwBP3SWGK1MB8GA1UdIwQYMBaAFM9448HkxS8PnjT2CRzwBP3SWGK1MA8GA1Ud +EwEB/wQFMAMBAf8wCgYIKoZIzj0EAwIDSQAwRgIhAKCSuUb8eWvb3WCQcA8Wj2av +V8UXsXPk5N2HyDojhklvAiEA97Z8QDFxMg20hm8cPGMlQT1r3PtPwF0eRsL2rPu0 +nxY= +-----END CERTIFICATE----- diff --git a/tests/clientECKey.pem b/tests/clientECKey.pem new file mode 100644 index 000000000..d8aa3c937 --- /dev/null +++ b/tests/clientECKey.pem @@ -0,0 +1,5 @@ +-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgjIlFWfrrN8Rsn7RU +9mZsG6hTDEa/1pXw8oNBML7UjeihRANCAARA1KCO0pDYoTIiNxqY1JIfzeG5XZpi +kkwJFgBe8QFQ3KCYLDEKh3aP3slniucGZ33JKVx5PrmdcW4O4qV+0iUt +-----END PRIVATE KEY----- diff --git a/tests/tlstest.py b/tests/tlstest.py index c58bd720a..45470a782 100755 --- a/tests/tlstest.py +++ b/tests/tlstest.py @@ -568,12 +568,27 @@ def connect(): synchro.recv(1) connection = connect() - # TODO add client certificate support in TLS 1.3 - settings = HandshakeSettings() - settings.maxVersion = (3, 3) - connection.handshakeClientCert(x509Chain, x509Key, settings=settings) + connection.handshakeClientCert(x509Chain, x509Key) + testConnClient(connection) + assert isinstance(connection.session.serverCertChain, X509CertChain) + connection.close() + + test_no += 1 + + print("Test {0} - good mutual ECDSA X.509".format(test_no)) + with open(os.path.join(dir, "clientECCert.pem")) as f: + x509Cert = X509().parse(f.read()) + x509Chain = X509CertChain([x509Cert]) + with open(os.path.join(dir, "clientECKey.pem")) as f: + x509Key = parsePEMKey(f.read(), private=True) + + synchro.recv(1) + connection = connect() + connection.handshakeClientCert(x509Chain, x509Key) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) + assert len(connection.session.serverCertChain.getEndEntityPublicKey()) ==\ + 256 connection.close() test_no += 1 @@ -1692,6 +1707,19 @@ def connect(): test_no += 1 + print("Test {0} - good mutual ECDSA X.509".format(test_no)) + synchro.send(b'R') + connection = connect() + connection.handshakeServer(certChain=x509ecdsaChain, + privateKey=x509ecdsaKey, reqCert=True) + testConnServer(connection) + assert(isinstance(connection.session.clientCertChain, X509CertChain)) + assert len(connection.session.clientCertChain.getEndEntityPublicKey()) ==\ + 256 + connection.close() + + test_no += 1 + print("Test {0} - good mutual X.509, TLSv1.3 no certs".format(test_no)) synchro.send(b'R') connection = connect() diff --git a/tlslite/keyexchange.py b/tlslite/keyexchange.py index a9ac8c774..d31b9df96 100644 --- a/tlslite/keyexchange.py +++ b/tlslite/keyexchange.py @@ -336,7 +336,7 @@ def makeCertificateVerify(version, handshakeHashes, validSigAlgs, premasterSecret, clientRandom, serverRandom, - privateKey.key_type) + key_type=privateKey.key_type) if signatureAlgorithm and \ signatureAlgorithm[1] == SignatureAlgorithm.ecdsa: padding = None diff --git a/tlslite/tlsconnection.py b/tlslite/tlsconnection.py index bd4c0713d..8ca80d5dd 100644 --- a/tlslite/tlsconnection.py +++ b/tlslite/tlsconnection.py @@ -1253,6 +1253,7 @@ def _clientTLS13Handshake(self, settings, session, clientHello, assert isinstance(certificate_verify, CertificateVerify) signature_scheme = certificate_verify.signatureAlgorithm + self.serverSigAlg = signature_scheme signature_context = KeyExchange.calcVerifyBytes((3, 4), srv_cert_verify_hh, @@ -1268,20 +1269,27 @@ def _clientTLS13Handshake(self, settings, session, clientHello, publicKey, serverCertChain, tackExt = result if signature_scheme[1] == SignatureAlgorithm.ecdsa: - padType = None - hashName = HashAlgorithm.toRepr(signature_scheme[0]) - saltLen = None + pad_type = None + hash_name = HashAlgorithm.toRepr(signature_scheme[0]) + matching_hash = self._curve_name_to_hash_name( + publicKey.curve_name) + if hash_name != matching_hash: + raise TLSIllegalParameterException( + "server selected signature method invalid for the " + "certificate it presented (curve mismatch)") + + salt_len = None else: scheme = SignatureScheme.toRepr(signature_scheme) - padType = SignatureScheme.getPadding(scheme) - hashName = SignatureScheme.getHash(scheme) - saltLen = getattr(hashlib, hashName)().digest_size + pad_type = SignatureScheme.getPadding(scheme) + hash_name = SignatureScheme.getHash(scheme) + salt_len = getattr(hashlib, hash_name)().digest_size if not publicKey.verify(certificate_verify.signature, signature_context, - padType, - hashName, - saltLen): + pad_type, + hash_name, + salt_len): raise TLSDecryptionFailed("server Certificate Verify " "signature " "verification failed") @@ -1470,7 +1478,7 @@ def _clientTLS13Handshake(self, settings, session, clientHello, bytearray(b''), # no session_id in TLS 1.3 serverHello.cipher_suite, None, # no SRP - None, # no client cert chain + clientCertChain, certificate.cert_chain if certificate else None, None, # no TACK False, # no TACK in hello @@ -1766,15 +1774,13 @@ def _clientFinished(self, premasterSecret, clientRandom, serverRandom, yield result yield masterSecret - def _clientGetKeyFromChain(self, certificate, settings, tackExt=None): - #Get and check cert chain from the Certificate message - cert_chain = certificate.cert_chain - if not cert_chain or cert_chain.getNumCerts() == 0: - for result in self._sendError(AlertDescription.illegal_parameter, - "Other party sent a Certificate message without "\ - "certificates"): - yield result + def _check_certchain_with_settings(self, cert_chain, settings): + """ + Verify that the key parameters match enabled ones. + Checks if the certificate key size matches the minimum and maximum + sizes set or that it uses curves enabled in settings + """ #Get and check public key from the cert chain publicKey = cert_chain.getEndEntityPublicKey() cert_type = cert_chain.x509List[0].certAlg @@ -1826,23 +1832,42 @@ def _clientGetKeyFromChain(self, certificate, settings, tackExt=None): "Other party's public key too large: %d" % len(publicKey)): yield result + yield publicKey + + def _clientGetKeyFromChain(self, certificate, settings, tack_ext=None): + #Get and check cert chain from the Certificate message + cert_chain = certificate.cert_chain + if not cert_chain or cert_chain.getNumCerts() == 0: + for result in self._sendError( + AlertDescription.illegal_parameter, + "Other party sent a Certificate message without "\ + "certificates"): + yield result + + for result in self._check_certchain_with_settings( + cert_chain, + settings): + if result in (0, 1): + yield result + else: break + public_key = result # If there's no TLS Extension, look for a TACK cert if tackpyLoaded: - if not tackExt: - tackExt = cert_chain.getTackExt() + if not tack_ext: + tack_ext = cert_chain.getTackExt() # If there's a TACK (whether via TLS or TACK Cert), check that it # matches the cert chain - if tackExt and tackExt.tacks: - for tack in tackExt.tacks: + if tack_ext and tack_ext.tacks: + for tack in tack_ext.tacks: if not cert_chain.checkTack(tack): for result in self._sendError( AlertDescription.illegal_parameter, "Other party's TACK doesn't match their public key"): yield result - yield publicKey, cert_chain, tackExt + yield public_key, cert_chain, tack_ext #********************************************************* @@ -2701,13 +2726,18 @@ def _serverTLS13Handshake(self, settings, clientHello, cipherSuite, signature_scheme, None, None, None, prf_name, b'client') - scheme = SignatureScheme.toRepr(signature_scheme) - pad_type = SignatureScheme.getPadding(scheme) - hash_name = SignatureScheme.getHash(scheme) - salt_len = getattr(hashlib, hash_name)().digest_size - public_key = client_cert_chain.getEndEntityPublicKey() + if signature_scheme[1] == SignatureAlgorithm.ecdsa: + hash_name = HashAlgorithm.toRepr(signature_scheme[0]) + pad_type = None + salt_len = None + else: + scheme = SignatureScheme.toRepr(signature_scheme) + pad_type = SignatureScheme.getPadding(scheme) + hash_name = SignatureScheme.getHash(scheme) + salt_len = getattr(hashlib, hash_name)().digest_size + if not public_key.verify(certificate_verify.signature, signature_context, pad_type, @@ -3704,7 +3734,8 @@ def _serverCertKeyExchange(self, clientHello, serverHello, if not reqCAs: reqCAs = [] valid_sig_algs = self._sigHashesToList(settings) - certificateRequest.create([ClientCertificateType.rsa_sign], + certificateRequest.create([ClientCertificateType.rsa_sign, + ClientCertificateType.ecdsa_sign], reqCAs, valid_sig_algs) msgs.append(certificateRequest) @@ -3792,46 +3823,56 @@ def _serverCertKeyExchange(self, clientHello, serverHello, "Verify"): yield result signatureAlgorithm = certificateVerify.signatureAlgorithm + if not signatureAlgorithm and \ + clientCertChain.x509List[0].certAlg == "ecdsa": + signatureAlgorithm = (HashAlgorithm.sha1, + SignatureAlgorithm.ecdsa) cvhh = self._certificate_verify_handshake_hash - verifyBytes = KeyExchange.calcVerifyBytes(self.version, - cvhh, - signatureAlgorithm, - premasterSecret, - clientHello.random, - serverHello.random) - publicKey = clientCertChain.getEndEntityPublicKey() - if len(publicKey) < settings.minKeySize: - for result in self._sendError(\ - AlertDescription.handshake_failure, - "Client's public key too small: %d" % len(publicKey)): - yield result - - if len(publicKey) > settings.maxKeySize: - for result in self._sendError(\ - AlertDescription.handshake_failure, - "Client's public key too large: %d" % len(publicKey)): + verify_bytes = KeyExchange.calcVerifyBytes( + self.version, + cvhh, + signatureAlgorithm, + premasterSecret, + clientHello.random, + serverHello.random, + key_type=clientCertChain.x509List[0].certAlg) + + for result in self._check_certchain_with_settings( + clientCertChain, + settings): + if result in (0, 1): yield result - - scheme = SignatureScheme.toRepr(signatureAlgorithm) - # for pkcs1 signatures hash is used to add PKCS#1 prefix, but - # that was already done by calcVerifyBytes - hashName = None - saltLen = 0 - if scheme is None: - padding = 'pkcs1' + else: break + public_key = result + + if not signatureAlgorithm or \ + signatureAlgorithm[1] != SignatureAlgorithm.ecdsa: + scheme = SignatureScheme.toRepr(signatureAlgorithm) + # for pkcs1 signatures hash is used to add PKCS#1 prefix, but + # that was already done by calcVerifyBytes + hash_name = None + salt_len = 0 + if scheme is None: + padding = 'pkcs1' + else: + padding = SignatureScheme.getPadding(scheme) + if padding == 'pss': + hash_name = SignatureScheme.getHash(scheme) + salt_len = getattr(hashlib, hash_name)().digest_size else: - padding = SignatureScheme.getPadding(scheme) - if padding == 'pss': - hashName = SignatureScheme.getHash(scheme) - saltLen = getattr(hashlib, hashName)().digest_size - - if not publicKey.verify(certificateVerify.signature, - verifyBytes, - padding, - hashName, - saltLen): - for result in self._sendError(\ + hash_name = HashAlgorithm.toStr(signatureAlgorithm[0]) + verify_bytes = verify_bytes[ + :public_key.public_key.curve.baselen] + padding = None + salt_len = None + + if not public_key.verify(certificateVerify.signature, + verify_bytes, + padding, + hash_name, + salt_len): + for result in self._sendError( AlertDescription.decrypt_error, "Signature failed to verify"): yield result @@ -4090,14 +4131,10 @@ def _sigHashesToList(settings, privateKey=None, certList=None, # in TLS 1.3 ECDSA key curve is bound to hash if publicKey and version > (3, 3): - size = len(publicKey) - size, r = divmod(size, 8) - size += int(bool(r)) - if size == 32 and hashName != "sha256": - continue - if size == 48 and hashName != "sha384": - continue - if size == 65 and hashName != "sha512": + curve = publicKey.curve_name + matching_hash = TLSConnection._curve_name_to_hash_name( + curve) + if hashName != matching_hash: continue sigAlgs.append((getattr(HashAlgorithm, hashName), @@ -4146,3 +4183,18 @@ def _curveNamesToList(settings): def _groupNamesToList(settings): """Convert list of acceptable ff groups to TLS identifiers.""" return [getattr(GroupName, val) for val in settings.dhGroups] + + @staticmethod + def _curve_name_to_hash_name(curve_name): + """Returns the matching hash for a given curve name, for TLS 1.3 + + expects the python-ecdsa curve names as parameter + """ + if curve_name == "NIST256p": + return "sha256" + if curve_name == "NIST384p": + return "sha384" + if curve_name == "NIST521p": + return "sha512" + raise TLSIllegalParameterException( + "Curve {0} is not supported in TLS 1.3".format(curve_name))