From 44f656d9f64a305ae9c83c41c8a9610608ca6233 Mon Sep 17 00:00:00 2001 From: Hubert Kario Date: Fri, 20 Sep 2019 15:52:21 +0200 Subject: [PATCH 01/10] parsing of ECDSA keys from X.509 certificates --- tlslite/utils/ecdsakey.py | 181 ++++++++++++++++++ tlslite/utils/keyfactory.py | 28 +++ tlslite/utils/python_ecdsakey.py | 99 ++++++++++ tlslite/x509.py | 44 ++++- .../test_tlslite_utils_python_ecdsakey.py | 37 ++++ unit_tests/test_tlslite_x509.py | 38 ++++ 6 files changed, 423 insertions(+), 4 deletions(-) create mode 100644 tlslite/utils/ecdsakey.py create mode 100644 tlslite/utils/python_ecdsakey.py create mode 100644 unit_tests/test_tlslite_utils_python_ecdsakey.py create mode 100644 unit_tests/test_tlslite_x509.py diff --git a/tlslite/utils/ecdsakey.py b/tlslite/utils/ecdsakey.py new file mode 100644 index 000000000..3c9128a80 --- /dev/null +++ b/tlslite/utils/ecdsakey.py @@ -0,0 +1,181 @@ +# Author: Stanislav Zidek +# See the LICENSE file for legal information regarding use of this file. + +"""Abstract class for ECDSA.""" + +from .cryptomath import * + + +class ECDSAKey(object): + """This is an abstract base class for ECDSA keys. + + Particular implementations of ECDSA keys, such as + :py:class:`~.python_ecdsakey.Python_ECDSAKey` + ... more coming + inherit from this. + + To create or parse an ECDSA key, don't use one of these classes + directly. Instead, use the factory functions in + :py:class:`~tlslite.utils.keyfactory`. + """ + + def __init__(self, public_key, private_key): + """Create a new ECDSA key. + + If public_key or private_key are passed in, the new key + will be initialized. + + :param public_key: ECDSA public key. + + :param private_key: ECDSA private key. + """ + raise NotImplementedError() + + def __len__(self): + """Return the size of the order of the curve of this key, in bits. + + :rtype: int + """ + raise NotImplementedError() + + def hasPrivateKey(self): + """Return whether or not this key has a private component. + + :rtype: bool + """ + raise NotImplementedError() + + def _sign(self, data): + raise NotImplementedError() + + def _hashAndSign(self, data, hAlg): + raise NotImplementedError() + + def _verify(self, signature, hash_bytes): + raise NotImplementedError() + + def hashAndSign(self, bytes, rsaScheme=None, hAlg='sha1', sLen=None): + """Hash and sign the passed-in bytes. + + This requires the key to have a private component. It performs + a signature on the passed-in data with selected hash algorithm. + + :type bytes: bytes-like object + :param bytes: The value which will be hashed and signed. + + :type rsaScheme: str + :param rsaScheme: Ignored, present for API compatibility with RSA + + :type hAlg: str + :param hAlg: The hash algorithm that will be used to hash data + + :type sLen: int + :param sLen: Ignored, present for API compatibility with RSA + + :rtype: bytearray + :returns: An ECDSA signature on the passed-in data. + """ + hAlg = hAlg.lower() + hashBytes = secureHash(bytearray(bytes), hAlg) + return self.sign(hashBytes, padding=rsaScheme, hashAlg=hAlg, + saltLen=sLen) + + def hashAndVerify(self, sigBytes, bytes, rsaScheme=None, hAlg='sha1', + sLen=None): + """Hash and verify the passed-in bytes with the signature. + + This verifies an ECDSA signature on the passed-in data + with selected hash algorithm. + + :type sigBytes: bytearray + :param sigBytes: An ECDSA signature, DER encoded. + + :type bytes: str or bytearray + :param bytes: The value which will be hashed and verified. + + :type rsaScheme: str + :param rsaScheme: Ignored, present for API compatibility with RSA + + :type hAlg: str + :param hAlg: The hash algorithm that will be used + + :type sLen: int + :param sLen: Ignored, present for API compatibility with RSA + + :rtype: bool + :returns: Whether the signature matches the passed-in data. + """ + hAlg = hAlg.lower() + + hashBytes = secureHash(bytearray(bytes), hAlg) + return self.verify(sigBytes, hashBytes, rsaScheme, hAlg, sLen) + + def sign(self, bytes, padding=None, hashAlg="sha1", saltLen=None): + """Sign the passed-in bytes. + + This requires the key to have a private component. It performs + an ECDSA signature on the passed-in data. + + :type bytes: bytearray + :param bytes: The value which will be signed (generally a binary + encoding of hash output. + + :type padding: str + :param padding: Ignored, present for API compatibility with RSA + + :type hashAlg: str + :param hashAlg: name of hash that was used for calculating the bytes + + :type saltLen: int + :param saltLen: Ignored, present for API compatibility with RSA + + :rtype: bytearray + :returns: An ECDSA signature on the passed-in data. + """ + sigBytes = self._sign(bytes, hashAlg) + return sigBytes + + def verify(self, sigBytes, bytes, padding=None, hashAlg=None, + saltLen=None): + """Verify the passed-in bytes with the signature. + + This verifies a PKCS1 signature on the passed-in data. + + :type sigBytes: bytearray + :param sigBytes: A PKCS1 signature. + + :type bytes: bytearray + :param bytes: The value which will be verified. + + :type padding: str + :param padding: Ignored, present for API compatibility with RSA + + :rtype: bool + :returns: Whether the signature matches the passed-in data. + """ + return self._verify(sigBytes, bytes) + + def acceptsPassword(self): + """Return True if the write() method accepts a password for use + in encrypting the private key. + + :rtype: bool + """ + raise NotImplementedError() + + def write(self, password=None): + """Return a string containing the key. + + :rtype: str + :returns: A string describing the key, in whichever format (PEM) + is native to the implementation. + """ + raise NotImplementedError() + + @staticmethod + def generate(bits): + """Generate a new key with the specified bit length. + + :rtype: ~tlslite.utils.ECDSAKey.ECDSAKey + """ + raise NotImplementedError() diff --git a/tlslite/utils/keyfactory.py b/tlslite/utils/keyfactory.py index d66afc80f..9d523c120 100644 --- a/tlslite/utils/keyfactory.py +++ b/tlslite/utils/keyfactory.py @@ -7,6 +7,7 @@ from .rsakey import RSAKey from .python_rsakey import Python_RSAKey +from .python_ecdsakey import Python_ECDSAKey from tlslite.utils import cryptomath if cryptomath.m2cryptoLoaded: @@ -193,3 +194,30 @@ def _createPrivateRSAKey(n, e, d, p, q, dP, dQ, qInv, key_type, key_type=key_type) raise ValueError("No acceptable implementations") # pylint: enable=invalid-name + + +def _create_public_ecdsa_key(point_x, point_y, curve_name, + implementations=("python",)): + """ + Convert public key parameters into concrete implementation of verifier. + + The public key in ECDSA is a point on elliptic curve, so it consists of + two integers that identify the point and the name of the curve on which + it needs to lie on. + + :type point_x: int + :param point_x: the 'x' coordinate of the point + :type point_y: int + :param point_y: the 'y' coordinate of the point + :type curve_name: str + :param curve_name: well known name of the curve (e.g. 'NIST256p' or + 'SECP256k1') + :type implementations: iterable of str + :param implementations: list of implementations that can be used as the + concrete implementation of the verifying key (only 'python' is + supported currently) + """ + for impl in implementations: + if impl == "python": + return Python_ECDSAKey(point_x, point_y, curve_name) + raise ValueError("No acceptable implementation") diff --git a/tlslite/utils/python_ecdsakey.py b/tlslite/utils/python_ecdsakey.py new file mode 100644 index 000000000..430068aff --- /dev/null +++ b/tlslite/utils/python_ecdsakey.py @@ -0,0 +1,99 @@ +# Author Hubert Kario, copyright 2019 + +from .ecdsakey import ECDSAKey +from ecdsa.curves import curves +from ecdsa.util import sigencode_der, sigdecode_der +from ecdsa.keys import VerifyingKey, SigningKey +from ecdsa.ellipticcurve import Point +from .tlshashlib import new, md5, sha1, sha224, sha256, sha384, sha512 +from .cryptomath import numBits + +class Python_ECDSAKey(ECDSAKey): + """ + Concrete implementation of ECDSA object backed by python-ecdsa. + + Object that uses the common, abstract API of asymmetric keys + that uses the python-ecdsa library for the cryptographic operations. + + :vartype public_key: VerifyingKey + :ivar public_key: python-ecdsa object for veryfying ECDSA signatures, if + `private_key` is set, it should match it (should be able to verify + signatures created by it) + + :vartype private_key: SigningKey + :ivar private_key: python-ecdsa object for creating ECDSA signatures + + :vartype key_type: str + :ivar key_type: type of assymetric algorithm used by the keys - for this + objects it is always 'ecdsa' + """ + + def __init__(self, x, y, curve_name, secret_multiplier=None): + if not curve_name: + raise ValueError("curve_name must be specified") + + for c in curves: + if c.name == curve_name or c.openssl_name == curve_name: + curve = c + break + else: + raise ValueError("Curve '{0}' not supported by python-ecdsa" + .format(curve_name)) + + self.private_key = None + self.public_key = None + self.key_type = "ecdsa" + + if secret_multiplier: + self.private_key = SigningKey.from_secret_exponent( + secret_multiplier, curve) + + if x and y: + point = Point(curve.curve, x, y) + self.public_key = VerifyingKey.from_public_point(point, curve) + + if not self.public_key: + self.public_key = self.private_key.get_verifying_key() + + def __len__(self): + return numBits(self.public_key.curve.order) + + def hasPrivateKey(self): + return bool(self.private_key) + + def acceptsPassword(self): + return False + + @staticmethod + def generate(bits): + raise NotImplementedError() + + def _sign(self, data, hAlg): + if hAlg == "md5": + func = md5 + elif hAlg == "sha1": + func = sha1 + elif hAlg == "sha224": + func = sha224 + elif hAlg == "sha256": + func = sha256 + elif hAlg == "sha384": + func = sha384 + else: + assert hAlg == "sha512", hAlg + func = sha512 + + return self.private_key.\ + sign_digest_deterministic(data, + hashfunc=func, + sigencode=sigencode_der) + + def _hashAndSign(self, data, hAlg): + return self.private_key.sign_deterministic(data, + hash=new(hAlg), + sigencode=sigencode_der) + + def _verify(self, signature, hash_bytes): + return self.public_key.verify_digest(signature, hash_bytes, + sigdecode_der) + diff --git a/tlslite/x509.py b/tlslite/x509.py index c919d9630..18bb0cbac 100644 --- a/tlslite/x509.py +++ b/tlslite/x509.py @@ -6,9 +6,11 @@ """Class representing an X.509 certificate.""" +from ecdsa.keys import VerifyingKey + from .utils.asn1parser import ASN1Parser from .utils.cryptomath import * -from .utils.keyfactory import _createPublicRSAKey +from .utils.keyfactory import _createPublicRSAKey, _create_public_ecdsa_key from .utils.pem import * @@ -90,11 +92,13 @@ def parseBinary(self, bytes): # first item of AlgorithmIdentifier is the algorithm alg = alg_identifier.getChild(0) - rsa_oid = alg.value - if list(rsa_oid) == [42, 134, 72, 134, 247, 13, 1, 1, 1]: + alg_oid = alg.value + if list(alg_oid) == [42, 134, 72, 134, 247, 13, 1, 1, 1]: self.certAlg = "rsa" - elif list(rsa_oid) == [42, 134, 72, 134, 247, 13, 1, 1, 10]: + elif list(alg_oid) == [42, 134, 72, 134, 247, 13, 1, 1, 10]: self.certAlg = "rsa-pss" + elif list(alg_oid) == [42, 134, 72, 206, 61, 2, 1]: + self.certAlg = "ecdsa" else: raise SyntaxError("Unrecognized AlgorithmIdentifier") @@ -106,9 +110,22 @@ def parseBinary(self, bytes): if params.value != bytearray(0): raise SyntaxError("Unexpected non-NULL parameters in " "AlgorithmIdentifier") + elif self.certAlg == "ecdsa": + self._ecdsa_pubkey_parsing( + tbs_certificate.getChildBytes(subject_public_key_info_index)) + return else: # rsa-pss pass # ignore parameters, if any - don't apply key restrictions + self._rsa_pubkey_parsing(subject_public_key_info) + + def _rsa_pubkey_parsing(self, subject_public_key_info): + """ + Parse the RSA public key from the certificate. + + :param subject_public_key_info: ASN1Parser object with subject + public key info of X.509 certificate + """ # Get the subjectPublicKey subject_public_key = subject_public_key_info.getChild(1) @@ -135,6 +152,25 @@ def parseBinary(self, bytes): self.publicKey = _createPublicRSAKey(n, e, self.certAlg) # pylint: enable=invalid-name + def _ecdsa_pubkey_parsing(self, subject_public_key_info): + """ + Convert the raw DER encoded ECDSA parameters into public key object + + :param subject_public_key_info: bytes like object with DER encoded + public key in it + """ + try: + # python ecdsa knows how to parse curve OIDs so re-use that + # code + public_key = VerifyingKey.from_der(subject_public_key_info) + except Exception: + raise SyntaxError("Malformed or unsupported public key in " + "certificate") + x = public_key.pubkey.point.x() + y = public_key.pubkey.point.y() + curve_name = public_key.curve.name + self.publicKey = _create_public_ecdsa_key(x, y, curve_name) + def getFingerprint(self): """ Get the hex-encoded fingerprint of this certificate. diff --git a/unit_tests/test_tlslite_utils_python_ecdsakey.py b/unit_tests/test_tlslite_utils_python_ecdsakey.py new file mode 100644 index 000000000..6e7997dc6 --- /dev/null +++ b/unit_tests/test_tlslite_utils_python_ecdsakey.py @@ -0,0 +1,37 @@ + +try: + import unittest2 as unittest +except ImportError: + import unittest + +try: + import mock + from mock import call +except ImportError: + import unittest.mock as mock + from unittest.mock import call + +from tlslite.utils.python_key import Python_Key +from tlslite.utils.python_rsakey import Python_RSAKey +from tlslite.utils.python_ecdsakey import Python_ECDSAKey + +class TestECDSAKey(unittest.TestCase): + + def test_python_ecdsa_fields(self): + key = ( + "-----BEGIN EC PRIVATE KEY-----\n" + "MHcCAQEEIAjma9Dr7NHgpoflzEFg2FabEPrCXY4qv4raf5GJ1jUmoAoGCCqGSM49\n" + "AwEHoUQDQgAEyDRjEAJe3F5T62MyZbhjoJnPLGL2nrTthLFymBupZ2IbnWYnqVWD\n" + "kT/L6i8sQhf2zCLrlSjj1kn7ERqPx/KZyg==\n" + "-----END EC PRIVATE KEY-----\n") + + parsed_key = Python_Key.parsePEM(key) + + self.assertIsInstance(parsed_key, Python_ECDSAKey) + self.assertTrue(parsed_key.hasPrivateKey()) + self.assertFalse(parsed_key.acceptsPassword()) + self.assertEqual(len(parsed_key), 256) + + def test_generate(self): + with self.assertRaises(NotImplementedError): + Python_ECDSAKey.generate(256) diff --git a/unit_tests/test_tlslite_x509.py b/unit_tests/test_tlslite_x509.py new file mode 100644 index 000000000..7f0f05b76 --- /dev/null +++ b/unit_tests/test_tlslite_x509.py @@ -0,0 +1,38 @@ +try: + import unittest2 as unittest +except ImportError: + import unittest + +try: + import mock + from mock import call +except ImportError: + import unittest.mock as mock + from unittest.mock import call + +from tlslite.x509 import X509 +from tlslite.utils.python_ecdsakey import Python_ECDSAKey + +class TestX509(unittest.TestCase): + def test_pem(self): + data = ( + "-----BEGIN CERTIFICATE-----\n" + "MIIBbTCCARSgAwIBAgIJAPM58cskyK+yMAkGByqGSM49BAEwFDESMBAGA1UEAwwJ\n" + "bG9jYWxob3N0MB4XDTE3MTAyMzExNDI0MVoXDTE3MTEyMjExNDI0MVowFDESMBAG\n" + "A1UEAwwJbG9jYWxob3N0MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEyDRjEAJe\n" + "3F5T62MyZbhjoJnPLGL2nrTthLFymBupZ2IbnWYnqVWDkT/L6i8sQhf2zCLrlSjj\n" + "1kn7ERqPx/KZyqNQME4wHQYDVR0OBBYEFPfFTUg9o3t6ehLsschSnC8Te8oaMB8G\n" + "A1UdIwQYMBaAFPfFTUg9o3t6ehLsschSnC8Te8oaMAwGA1UdEwQFMAMBAf8wCQYH\n" + "KoZIzj0EAQNIADBFAiA6p0YM5ZzfW+klHPRU2r13/IfKgeRfDR3dtBngmPvxUgIh\n" + "APTeSDeJvYWVBLzyrKTeSerNDKKHU2Rt7sufipv76+7s\n" + "-----END CERTIFICATE-----\n") + x509 = X509() + x509.parse(data) + + self.assertIsNotNone(x509.publicKey) + self.assertIsInstance(x509.publicKey, Python_ECDSAKey) + self.assertEqual(x509.publicKey.public_key.pubkey.point.x(), + 90555129468518880658937518803653422065597446465131062487534800201457796212578) + self.assertEqual(x509.publicKey.public_key.pubkey.point.y(), + 12490546948316647166662676770106859255378658810545502161335656899238893361610) + From 74346eb52f11ca987b2d0740bd4c9c15963bad41 Mon Sep 17 00:00:00 2001 From: Hubert Kario Date: Fri, 20 Sep 2019 17:48:18 +0200 Subject: [PATCH 02/10] verifying ECDSA signatures in SKE --- tlslite/keyexchange.py | 24 ++ tlslite/messages.py | 10 +- tlslite/utils/python_ecdsakey.py | 9 +- unit_tests/test_tlslite_keyexchange.py | 358 +++++++++++++++++++++++++ unit_tests/test_tlslite_messages.py | 15 ++ 5 files changed, 411 insertions(+), 5 deletions(-) diff --git a/tlslite/keyexchange.py b/tlslite/keyexchange.py index 5d2d93b7c..2af841849 100644 --- a/tlslite/keyexchange.py +++ b/tlslite/keyexchange.py @@ -133,6 +133,24 @@ def signServerKeyExchange(self, serverKeyExchange, sigHash=None): else: self._tls12_signSKE(serverKeyExchange, sigHash) + @staticmethod + def _tls12_verify_ecdsa_SKE(serverKeyExchange, publicKey, clientRandom, + serverRandom, validSigAlgs): + hashName = HashAlgorithm.toRepr(serverKeyExchange.hashAlg) + if not hashName: + raise TLSIllegalParameterException("Unknown hash algorithm") + + hashBytes = serverKeyExchange.hash(clientRandom, serverRandom) + + hashBytes = hashBytes[:publicKey.public_key.curve.baselen] + + if not publicKey.verify(serverKeyExchange.signature, hashBytes, + padding=None, + hashAlg=hashName, + saltLen=None): + raise TLSDecryptionFailed("Server Key Exchange signature " + "invalid") + @staticmethod def _tls12_verify_SKE(serverKeyExchange, publicKey, clientRandom, serverRandom, validSigAlgs): @@ -142,6 +160,12 @@ def _tls12_verify_SKE(serverKeyExchange, publicKey, clientRandom, raise TLSIllegalParameterException("Server selected " "invalid signature " "algorithm") + if serverKeyExchange.signAlg == SignatureAlgorithm.ecdsa: + return KeyExchange._tls12_verify_ecdsa_SKE(serverKeyExchange, + publicKey, + clientRandom, + serverRandom, + validSigAlgs) schemeID = (serverKeyExchange.hashAlg, serverKeyExchange.signAlg) scheme = SignatureScheme.toRepr(schemeID) diff --git a/tlslite/messages.py b/tlslite/messages.py index 47fb0fb60..9df8e7320 100644 --- a/tlslite/messages.py +++ b/tlslite/messages.py @@ -1488,7 +1488,8 @@ def parse(self, parser): else: raise AssertionError() - if self.cipherSuite in CipherSuite.certAllSuites: + if self.cipherSuite in CipherSuite.certAllSuites or\ + self.cipherSuite in CipherSuite.ecdheEcdsaSuites: if self.version == (3, 3): self.hashAlg = parser.get(1) self.signAlg = parser.get(1) @@ -1536,7 +1537,8 @@ def write(self): """ writer = Writer() writer.bytes += self.writeParams() - if self.cipherSuite in CipherSuite.certAllSuites: + if self.cipherSuite in CipherSuite.certAllSuites or \ + self.cipherSuite in CipherSuite.ecdheEcdsaSuites: if self.version >= (3, 3): assert self.hashAlg != 0 and self.signAlg != 0 writer.add(self.hashAlg, 1) @@ -1561,6 +1563,10 @@ def hash(self, clientRandom, serverRandom): else: hashAlg = SignatureScheme.getHash(sigScheme) return secureHash(bytesToHash, hashAlg) + # ECDSA ciphers in TLS 1.1 and earlier sign the messages using + # SHA-1 only + if self.cipherSuite in CipherSuite.ecdheEcdsaSuites: + return SHA1(bytesToHash) return MD5(bytesToHash) + SHA1(bytesToHash) diff --git a/tlslite/utils/python_ecdsakey.py b/tlslite/utils/python_ecdsakey.py index 430068aff..8a0e0b7fc 100644 --- a/tlslite/utils/python_ecdsakey.py +++ b/tlslite/utils/python_ecdsakey.py @@ -3,7 +3,7 @@ from .ecdsakey import ECDSAKey from ecdsa.curves import curves from ecdsa.util import sigencode_der, sigdecode_der -from ecdsa.keys import VerifyingKey, SigningKey +from ecdsa.keys import VerifyingKey, SigningKey, BadSignatureError from ecdsa.ellipticcurve import Point from .tlshashlib import new, md5, sha1, sha224, sha256, sha384, sha512 from .cryptomath import numBits @@ -94,6 +94,9 @@ def _hashAndSign(self, data, hAlg): sigencode=sigencode_der) def _verify(self, signature, hash_bytes): - return self.public_key.verify_digest(signature, hash_bytes, - sigdecode_der) + try: + return self.public_key.verify_digest(signature, hash_bytes, + sigdecode_der) + except BadSignatureError: + return False diff --git a/unit_tests/test_tlslite_keyexchange.py b/unit_tests/test_tlslite_keyexchange.py index 8f23f7bbc..25d28f208 100644 --- a/unit_tests/test_tlslite_keyexchange.py +++ b/unit_tests/test_tlslite_keyexchange.py @@ -36,6 +36,7 @@ from tlslite.extensions import SupportedGroupsExtension, SNIExtension from tlslite.utils.ecc import getCurveByName, decodeX962Point, encodeX962Point,\ getPointByteSize +from tlslite.utils.compat import a2b_hex import ecdsa from operator import mul try: @@ -279,6 +280,363 @@ def test_verifyServerKeyExchange_with_damaged_signature_in_TLS1_1(self): bytearray(32), None) + +class TestServerKeyExchangeP256(unittest.TestCase): + @classmethod + def setUpClass(cls): + certificate = ( + "-----BEGIN CERTIFICATE-----\n" + "MIIBbTCCARSgAwIBAgIJAPM58cskyK+yMAkGByqGSM49BAEwFDESMBAGA1UEAwwJ\n" + "bG9jYWxob3N0MB4XDTE3MTAyMzExNDI0MVoXDTE3MTEyMjExNDI0MVowFDESMBAG\n" + "A1UEAwwJbG9jYWxob3N0MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEyDRjEAJe\n" + "3F5T62MyZbhjoJnPLGL2nrTthLFymBupZ2IbnWYnqVWDkT/L6i8sQhf2zCLrlSjj\n" + "1kn7ERqPx/KZyqNQME4wHQYDVR0OBBYEFPfFTUg9o3t6ehLsschSnC8Te8oaMB8G\n" + "A1UdIwQYMBaAFPfFTUg9o3t6ehLsschSnC8Te8oaMAwGA1UdEwQFMAMBAf8wCQYH\n" + "KoZIzj0EAQNIADBFAiA6p0YM5ZzfW+klHPRU2r13/IfKgeRfDR3dtBngmPvxUgIh\n" + "APTeSDeJvYWVBLzyrKTeSerNDKKHU2Rt7sufipv76+7s\n" + "-----END CERTIFICATE-----\n") + x509 = X509() + x509.parse(certificate) + cls.x509 = x509 + + def test_verify_ecdsa_signature_in_TLS1_2_SHA512(self): + skemsg = a2b_hex( + "00009103001741048803928b0f1448237646bd5ae80b5144b315eb" + "f083212f62db03bfd20ff1ec83b086a6b642e9147953b65518b94fdd" + "b7946fa08726478e5d2543e833c24f57da060300483046022100b3ee" + "ead2f6b30b905ce674f6b7c9e5e4e59239931a7836bb18be03f39e60" + "a81c022100b9a064aead86af8e59aaaa30ca57e06f05e0ede23e4745" + "524d830f5b85c7fa14") + parser = Parser(skemsg) + + ske = ServerKeyExchange( + CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + (3, 3)) + ske.parse(parser) + + client_random = a2b_hex("5078aff2993c6cc0d5bbc014a60e348890c" + "ef321469d9f5ecc270be5e453e7c9") + server_random = a2b_hex("aa14012f6c6070b585fa53ba1010d5c4c08" + "7314bd272cd52734c44c8f6037679") + + + KeyExchange.verifyServerKeyExchange(ske, + self.x509.publicKey, + client_random, + server_random, + [(HashAlgorithm.sha512, + SignatureAlgorithm.ecdsa)]) + + def test_verify_ecdsa_signature_in_TLS1_2_SHA1(self): + skemsg = a2b_hex( + "00008f0300174104677708522c34" + "792f4a71864854bc439134baf70cf9ec887db4f8" + "ad39f87071c284f5a07975de42b0beec9dfe08c3" + "ee3cdf53c49daa57aadfddee9c3be3ca05670203" + "0046304402206e3f278d3b54108b40df17c71ac6" + "8a801c7bb863a7c477fd8a21b680ca02fbeb0220" + "1b497a72f9af66f406d1146971623d7087710641" + "dfaff5cfd575a8359165c18f") + parser = Parser(skemsg) + + ske = ServerKeyExchange( + CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + (3, 3)) + ske.parse(parser) + + client_random = a2b_hex("affab7761c9d4882d10c17757f648" + "76c04a25c0ccdbfa98d9c6a545794" + "ab566c") + server_random = a2b_hex("21b01edc3232325bc6d761e9d4fea" + "ccd811051c5bc5f3e09d769a5e15d" + "d67273") + + KeyExchange.verifyServerKeyExchange(ske, + self.x509.publicKey, + client_random, + server_random, + [(HashAlgorithm.sha1, + SignatureAlgorithm.ecdsa)]) + + def test_verify_ecdsa_signature_in_TLS1_2_SHA256(self): + skemsg = a2b_hex( + "0000900300174104677708522c34792f4a71864854" + "bc439134baf70cf9ec887db4f8ad39f87071c284f5a0" + "7975de42b0beec9dfe08c3ee3cdf53c49daa57aadfdd" + "ee9c3be3ca05670403004730450220762a8a7bfe61b9" + "13f92f396908c889c4d12812057fe2f41b49c4bf572d" + "a3ec17022100d02bbc51221eb00702856981a36a0958" + "fda7f807f0881c677d20a5cc5cac03f4") + parser = Parser(skemsg) + + ske = ServerKeyExchange( + CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + (3, 3)) + ske.parse(parser) + + client_random = a2b_hex("cd10871a3d49e42ec2a9e6fc871d1049" + "86f5b9c91f4d3f9d693290a611424d2f") + server_random = a2b_hex("109f6344e1fad353b2767f63ea152474" + "bb12d21f5bd903880a30bda436f31684") + + KeyExchange.verifyServerKeyExchange(ske, + self.x509.publicKey, + client_random, + server_random, + [(HashAlgorithm.sha256, + SignatureAlgorithm.ecdsa)]) + + def test_verify_ecdsa_signature_with_mismatched_hash(self): + # valid SKE but changed sha256 ID to SHA1 ID + skemsg = a2b_hex( + "0000900300174104677708522c34792f4a71864854" + "bc439134baf70cf9ec887db4f8ad39f87071c284f5a0" + "7975de42b0beec9dfe08c3ee3cdf53c49daa57aadfdd" + "ee9c3be3ca05670203004730450220762a8a7bfe61b9" + "13f92f396908c889c4d12812057fe2f41b49c4bf572d" + "a3ec17022100d02bbc51221eb00702856981a36a0958" + "fda7f807f0881c677d20a5cc5cac03f4") + parser = Parser(skemsg) + + ske = ServerKeyExchange( + CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + (3, 3)) + ske.parse(parser) + + client_random = a2b_hex("cd10871a3d49e42ec2a9e6fc871d1049" + "86f5b9c91f4d3f9d693290a611424d2f") + server_random = a2b_hex("109f6344e1fad353b2767f63ea152474" + "bb12d21f5bd903880a30bda436f31684") + + with self.assertRaises(TLSDecryptionFailed): + KeyExchange.verifyServerKeyExchange(ske, + self.x509.publicKey, + client_random, + server_random, + [(HashAlgorithm.sha1, + SignatureAlgorithm.ecdsa)]) + + def test_verify_ecdsa_signature_with_unknown_alg(self): + # valid SKE but changed sha256 ID to 10 + skemsg = a2b_hex( + "0000900300174104677708522c34792f4a71864854" + "bc439134baf70cf9ec887db4f8ad39f87071c284f5a0" + "7975de42b0beec9dfe08c3ee3cdf53c49daa57aadfdd" + "ee9c3be3ca05670a03004730450220762a8a7bfe61b9" + "13f92f396908c889c4d12812057fe2f41b49c4bf572d" + "a3ec17022100d02bbc51221eb00702856981a36a0958" + "fda7f807f0881c677d20a5cc5cac03f4") + parser = Parser(skemsg) + + ske = ServerKeyExchange( + CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + (3, 3)) + ske.parse(parser) + + client_random = a2b_hex("cd10871a3d49e42ec2a9e6fc871d1049" + "86f5b9c91f4d3f9d693290a611424d2f") + server_random = a2b_hex("109f6344e1fad353b2767f63ea152474" + "bb12d21f5bd903880a30bda436f31684") + + with self.assertRaises(TLSIllegalParameterException) as e: + KeyExchange.verifyServerKeyExchange(ske, + self.x509.publicKey, + client_random, + server_random, + [(0x0a, + SignatureAlgorithm.ecdsa)]) + + self.assertIn("Unknown hash algorithm", str(e.exception)) + + +class TestServerKeyExchangeP384(unittest.TestCase): + @classmethod + def setUpClass(cls): + certificate = ( + "-----BEGIN CERTIFICATE-----\n" + "MIIBqTCCATGgAwIBAgIJAOg7t3nOR8B6MAkGByqGSM49BAEwFDESMBAGA1UEAwwJ\n" + "bG9jYWxob3N0MB4XDTE3MTAyNDA4NDE0NFoXDTE3MTEyMzA4NDE0NFowFDESMBAG\n" + "A1UEAwwJbG9jYWxob3N0MHYwEAYHKoZIzj0CAQYFK4EEACIDYgAESTMngPUfYFqz\n" + "6c13TgothkDP0NNLb9BxfJ6PeX+Z2Y9Kb/xONDrAil/avCHW3OzYrZjiVrhENRcR\n" + "1mtxA2ubSlU4bJwItdRy+frJolg4b27Wl9lSpCAn3rgCff9e0puoo1AwTjAdBgNV\n" + "HQ4EFgQUZ6FxONYHIe0yOhDzNfNlogyNkg8wHwYDVR0jBBgwFoAUZ6FxONYHIe0y\n" + "OhDzNfNlogyNkg8wDAYDVR0TBAUwAwEB/zAJBgcqhkjOPQQBA2cAMGQCMASrET+o\n" + "XSFfkriYgmIW8T5tSHZ7Jys1krAS4GUEHYdTkKWSuGfM+0uqblSNgjjYjAIwPXxK\n" + "pSc6nBMwoE0NFnEa+iL8O3Zl7LDnX2AuKOaV4Id8UuW9653fRCn7CPrfaPOm\n" + "-----END CERTIFICATE-----\n") + x509 = X509() + x509.parse(certificate) + + cls.x509 = x509 + + def test_verify_ecdsa_signature_in_TLS1_2_SHA512(self): + skemsg = a2b_hex( + "0000af03001741046d571e6310febf38201af10f823241df990a2887f779e590" + "00dd8fb3ee801e0e700313225e3268c3db2d1eaf13495b99ac5fc4bff5c22d71" + "c9e867c958aafebb0603006630640230043bc6fd59d5b39296153264a10d63ae" + "8937120ca0874e7848004d4ce70d66d133af993edca59e93e31845671a1b6743" + "0230710169783ce59742bcff9884105bc85675d757cf3bc6ac3250f795ee8021" + "1f086afab96a9aafd3382c96eeb5afde2bc3") + + parser = Parser(skemsg) + + ske = ServerKeyExchange( + CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + (3, 3)) + ske.parse(parser) + + client_random = a2b_hex("872eca2bd39eaca9eedb31c285f5809b" + "5fd5a51efd6d1dee4e1ce4f741920a36") + server_random = a2b_hex("d85951258d55798f93619c38ac4fdd54" + "153c5930cdf2cba6d555eec8d709e303") + + KeyExchange.verifyServerKeyExchange(ske, + self.x509.publicKey, + client_random, + server_random, + [(HashAlgorithm.sha512, + SignatureAlgorithm.ecdsa)]) + + def test_verify_ecdsa_signature_in_TLS1_2_SHA384(self): + skemsg = a2b_hex( + "0000b103001741046d571e6310febf38201af10f823241df990a2887f77" + "9e59000dd8fb3ee801e0e700313225e3268c3db2d1eaf13495b99ac5fc4" + "bff5c22d71c9e867c958aafebb050300683066023100e12366ba68c36ae" + "f04c691f0c0067d0c8025f116627c5b963154fd219a9bc27ec4a11d6d1b" + "d4b5d33de8d2dcf639501c0231008a99dad2fa99a689e25422127f12dfe" + "8fdcaea1b97cb17b6267ebdd97631e004ca323132cc66e651844b40984c" + "7aa942") + + parser = Parser(skemsg) + + ske = ServerKeyExchange( + CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + (3, 3)) + ske.parse(parser) + + client_random = a2b_hex("f706a53d88a5eb52d981c9943413b4f6" + "73d7426dd4373fe517c1b881ab5713d2") + server_random = a2b_hex("d35fab56329f6ff1ac36a6fc6b98a393" + "e50bc4cd8b8bf3038f8b914f0c105cd2") + + KeyExchange.verifyServerKeyExchange(ske, + self.x509.publicKey, + client_random, + server_random, + [(HashAlgorithm.sha384, + SignatureAlgorithm.ecdsa)]) + + + def test_verify_ecdsa_signature_in_TLS1_2_SHA256(self): + skemsg = a2b_hex( + "0000b103001741046d571e6310febf38201af10f823241df990a2887f779" + "e59000dd8fb3ee801e0e700313225e3268c3db2d1eaf13495b99ac5fc4bf" + "f5c22d71c9e867c958aafebb04030068306602310080e64fbb7063b5c424" + "4e59611a763adafdbf4bc392e3af7ad29c98251a4dcfd9f59b8c39fa46a8" + "f035d90e0b35181bee023100a383176790f00b2731f85ba90e05e6814080" + "8f05860c138e0c57eb496b6411792af4662acea03968d1b192afd6dbc2d6" + ) + + parser = Parser(skemsg) + + ske = ServerKeyExchange( + CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + (3, 3)) + ske.parse(parser) + + client_random = a2b_hex("2b1ffe918934adb2d66bb085bf56ba31" + "0f6568732f81abc7f60c1bc43b2b8d15") + server_random = a2b_hex("5141986a5d3b26cbc051d58c76074643" + "c62d8ba9a0aa77bceaa8ecec59771bfe") + + KeyExchange.verifyServerKeyExchange(ske, + self.x509.publicKey, + client_random, + server_random, + [(HashAlgorithm.sha256, + SignatureAlgorithm.ecdsa)]) + +class TestServerKeyExchangeP521(unittest.TestCase): + @classmethod + def setUpClass(cls): + certificate = ( + "-----BEGIN CERTIFICATE-----\n" + "MIIB9DCCAVegAwIBAgIJALLS/7HVXjvLMAkGByqGSM49BAEwFDESMBAGA1UEAwwJ\n" + "bG9jYWxob3N0MB4XDTE3MTAyNDA5MzI1OVoXDTE3MTEyMzA5MzI1OVowFDESMBAG\n" + "A1UEAwwJbG9jYWxob3N0MIGbMBAGByqGSM49AgEGBSuBBAAjA4GGAAQA2W4PjcS5\n" + "O2XC/BePOpu3qLrIKdEYPTbXPz3kX1KAMUKb7Mndl8gYhmt3orymNfyvw/TjUBeT\n" + "D9C/kH87MM0MTdIADcZOQ8Kaq1KB33bNbsXtkV29SF+070tE6B0AdbKkA51Ak1G8\n" + "FWmEZtf01e8ajcfsDLzkQenY8nD9/jdXonyRMD6jUDBOMB0GA1UdDgQWBBT8H+nt\n" + "DHosWy5fTjmDltyvBB6JUjAfBgNVHSMEGDAWgBT8H+ntDHosWy5fTjmDltyvBB6J\n" + "UjAMBgNVHRMEBTADAQH/MAkGByqGSM49BAEDgYsAMIGHAkIB8rNy9Uq2ZZwFwbdw\n" + "FBjteJEkJS26E7m3bLf5YmCmdH6wyQd+EjoPVBwOrQxcH0eR/vYEmouTlsBGxdRN\n" + "1eIm4DQCQUVPccfLbGV4KK3tkij1GH9ej9AQvLpjVMkyhwNadmGadOcIpbciQyll\n" + "+m9uHWVCSntAeSzf2A6nnVBvRvGbZu1w\n" + "-----END CERTIFICATE-----\n") + + x509 = X509() + x509.parse(certificate) + + cls.x509 = x509 + + def test_verify_ecdsa_signature_in_TLS1_2_SHA384(self): + skemsg = a2b_hex( + "0000d3030017410402f8552b8fb2ce583f6572a872373857de5a4f179c00870" + "9305391e847416a894d523759e73205b94c64a683bb61f8a6c01c7fee180591" + "24f47e77aad3b32ada0503008a30818702420153e2b6526452f2174c4b70f9c" + "de18c63bc8a70bfde5f313e7608fb799893fea45d414e9ff176a9a0a7cd1b8c" + "0d659d147501ea6482d8d43ac75e0ce6864674196102415e6f6ac717dad1b10" + "cd20e9dc3d4f6d1e483a349cc7d37ecdb68231b3b41dd60cff9068e38cbd62d" + "1203be11556991c85c6b9348b958318a91cdaa2e249ea1cb9e") + parser = Parser(skemsg) + + ske = ServerKeyExchange( + CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + (3, 3)) + ske.parse(parser) + + client_random = a2b_hex("ccef6eefa66dda9e90c5e56dc3efa1ec" + "259485ebcd2ec736ad2bcb3598ac3615") + server_random = a2b_hex("739fd50e4ecbb177f882536a71828f8e" + "bcbcf3a3217da24fa3eb6f7d7b009401") + + KeyExchange.verifyServerKeyExchange(ske, + self.x509.publicKey, + client_random, + server_random, + [(HashAlgorithm.sha384, + SignatureAlgorithm.ecdsa)]) + + def test_verify_ecdsa_signature_in_TLS1_2_SHA512(self): + skemsg = a2b_hex( + "0000d3030017410402f8552b8fb2ce583f6572a872373857de5a4f179c0087" + "09305391e847416a894d523759e73205b94c64a683bb61f8a6c01c7fee180591" + "24f47e77aad3b32ada0603008a308187024200c1ab9d049e28cdd107b7c180d4" + "dc8f78970edcee88a8b8fbd1a68572d342d97fa0ad1a7d1285ae8ea387c00d2d" + "f56dcd36146460ccba99e1323078888364604c3202412388817fea69babcb482" + "cacfe92056507cb85cd840c6a19c3fbf079e67399d72c81642b11b9e89612405" + "57e39a617f25efeebcfdcf3bf68c792f3a91318b0bd695") + + parser = Parser(skemsg) + + ske = ServerKeyExchange( + CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + (3, 3)) + ske.parse(parser) + + client_random = a2b_hex("455c9402792ab4443cacc8f3bc2c9815" + "7a3f3e1026a49e50fc04a9a3d2ba18d3") + server_random = a2b_hex("ae2c2a0b6f65209c10a6766e8d230eb6" + "465927ae363950430ec049d6e32cae24") + + KeyExchange.verifyServerKeyExchange(ske, + self.x509.publicKey, + client_random, + server_random, + [(HashAlgorithm.sha512, + SignatureAlgorithm.ecdsa)]) + + class TestCalcVerifyBytes(unittest.TestCase): def setUp(self): self.handshake_hashes = HandshakeHashes() diff --git a/unit_tests/test_tlslite_messages.py b/unit_tests/test_tlslite_messages.py index 967f2fa6b..bfe34be03 100644 --- a/unit_tests/test_tlslite_messages.py +++ b/unit_tests/test_tlslite_messages.py @@ -2376,6 +2376,21 @@ def test_hash(self): b'\x8e?YW\xcd\xad\xc6\x83\x91\x1d.fe,\x17y' + b'=\xc4T\x89')) + def test_hash_with_ecdsa_in_tls1_1(self): + ske = ServerKeyExchange( + CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, + (3, 2)) + + ske.createECDH(ECCurveType.named_curve, + named_curve=GroupName.secp256r1, + point=bytearray(b'\x04\xff\xab')) + + hash1 = ske.hash(bytearray(32), bytearray(32)) + + self.assertEqual(hash1, + bytearray(b' \xa0\xc1P5\xf7K/\xednd' + b'\xbaQ\xedo\xa13Z\xa5}')) + def test_hash_with_rsa_pss_sha256(self): ske = ServerKeyExchange( CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA, From 11cf40c1ce844138d7ebd0dfa01c2224b356a139 Mon Sep 17 00:00:00 2001 From: Hubert Kario Date: Fri, 20 Sep 2019 18:28:09 +0200 Subject: [PATCH 03/10] ECDSA support in handshakesettings and constants --- tlslite/constants.py | 79 ++++++++++++++---- tlslite/handshakesettings.py | 30 ++++++- unit_tests/test_tlslite_constants.py | 86 ++++++++++++++++++++ unit_tests/test_tlslite_handshakesettings.py | 8 ++ 4 files changed, 183 insertions(+), 20 deletions(-) diff --git a/tlslite/constants.py b/tlslite/constants.py index 834ff0daf..bf509e12a 100644 --- a/tlslite/constants.py +++ b/tlslite/constants.py @@ -80,6 +80,9 @@ class ClientCertificateType(TLSEnum): dss_sign = 2 rsa_fixed_dh = 3 dss_fixed_dh = 4 + ecdsa_sign = 64 # RFC 8422 + rsa_fixed_ecdh = 65 # RFC 8422 + ecdsa_fixed_ecdh = 66 # RFC 8422 class SSL2HandshakeType(TLSEnum): @@ -208,6 +211,9 @@ class SignatureScheme(TLSEnum): rsa_pkcs1_sha256 = (4, 1) rsa_pkcs1_sha384 = (5, 1) rsa_pkcs1_sha512 = (6, 1) + ecdsa_secp256r1_sha256 = (4, 3) + ecdsa_secp384r1_sha384 = (5, 3) + ecdsa_secp521r1_sha512 = (6, 3) rsa_pss_rsae_sha256 = (8, 4) rsa_pss_rsae_sha384 = (8, 5) rsa_pss_rsae_sha512 = (8, 6) @@ -273,7 +279,7 @@ def getHash(scheme): kType, _, hName = vals else: kType, _, _, hName = vals - assert kType == 'rsa' + assert kType in ('rsa', 'ecdsa') return hName @@ -764,12 +770,16 @@ class CipherSuite: # ChaCha20/Poly1305 based Cipher Suites for TLS1.2 TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_draft_00 = 0xCCA1 ietfNames[0xCCA1] = 'TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_draft_00' + TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_draft_00 = 0xCCA2 + ietfNames[0xCCA2] = 'TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_draft_00' TLS_DHE_RSA_WITH_CHACHA20_POLY1305_draft_00 = 0xCCA3 ietfNames[0xCCA3] = 'TLS_DHE_RSA_WITH_CHACHA20_POLY1305_draft_00' # RFC 7905 - ChaCha20-Poly1305 Cipher Suites for TLS TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256 = 0xCCA8 ietfNames[0xCCA8] = 'TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256' + TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256 = 0xCCA9 + ietfNames[0xCCA9] = 'TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256' TLS_DHE_RSA_WITH_CHACHA20_POLY1305_SHA256 = 0xCCAA ietfNames[0xCCAA] = 'TLS_DHE_RSA_WITH_CHACHA20_POLY1305_SHA256' @@ -780,7 +790,7 @@ class CipherSuite: #: 3DES CBC ciphers tripleDESSuites = [] - tripleDESSuites.append(TLS_ECDHE_ECDSA_WITH_3DES_EDE_CBC_SHA) # unsupp + tripleDESSuites.append(TLS_ECDHE_ECDSA_WITH_3DES_EDE_CBC_SHA) tripleDESSuites.append(TLS_ECDH_ECDSA_WITH_3DES_EDE_CBC_SHA) # unsupported tripleDESSuites.append(TLS_ECDH_RSA_WITH_3DES_EDE_CBC_SHA) # unsupported tripleDESSuites.append(TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA) @@ -801,8 +811,8 @@ class CipherSuite: aes128Suites.append(TLS_RSA_WITH_AES_128_CBC_SHA256) aes128Suites.append(TLS_DHE_RSA_WITH_AES_128_CBC_SHA256) aes128Suites.append(TLS_DH_ANON_WITH_AES_128_CBC_SHA256) - aes128Suites.append(TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256) # unsupp - aes128Suites.append(TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA) # unsupported + aes128Suites.append(TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256) + aes128Suites.append(TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA) aes128Suites.append(TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA256) # unsupported aes128Suites.append(TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA) # unsupported aes128Suites.append(TLS_ECDH_RSA_WITH_AES_128_CBC_SHA256) # unsupported @@ -821,8 +831,8 @@ class CipherSuite: aes256Suites.append(TLS_RSA_WITH_AES_256_CBC_SHA256) aes256Suites.append(TLS_DHE_RSA_WITH_AES_256_CBC_SHA256) aes256Suites.append(TLS_DH_ANON_WITH_AES_256_CBC_SHA256) - aes256Suites.append(TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384) # unsupported - aes256Suites.append(TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA) # unsupported + aes256Suites.append(TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384) + aes256Suites.append(TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA) aes256Suites.append(TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA384) # unsupported aes256Suites.append(TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA) # unsupported aes256Suites.append(TLS_ECDH_RSA_WITH_AES_256_CBC_SHA384) # unsupported @@ -836,7 +846,7 @@ class CipherSuite: aes128GcmSuites.append(TLS_RSA_WITH_AES_128_GCM_SHA256) aes128GcmSuites.append(TLS_DHE_RSA_WITH_AES_128_GCM_SHA256) aes128GcmSuites.append(TLS_DH_ANON_WITH_AES_128_GCM_SHA256) - aes128GcmSuites.append(TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256) # unsupp + aes128GcmSuites.append(TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256) aes128GcmSuites.append(TLS_ECDH_ECDSA_WITH_AES_128_GCM_SHA256) # unsupp aes128GcmSuites.append(TLS_ECDH_RSA_WITH_AES_128_GCM_SHA256) # unsupp aes128GcmSuites.append(TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256) @@ -847,7 +857,7 @@ class CipherSuite: aes256GcmSuites.append(TLS_RSA_WITH_AES_256_GCM_SHA384) aes256GcmSuites.append(TLS_DHE_RSA_WITH_AES_256_GCM_SHA384) aes256GcmSuites.append(TLS_DH_ANON_WITH_AES_256_GCM_SHA384) - aes256GcmSuites.append(TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384) # unsupp + aes256GcmSuites.append(TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384) aes256GcmSuites.append(TLS_ECDH_ECDSA_WITH_AES_256_GCM_SHA384) # unsupp aes256GcmSuites.append(TLS_ECDH_RSA_WITH_AES_256_GCM_SHA384) # unsupported aes256GcmSuites.append(TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) @@ -856,18 +866,21 @@ class CipherSuite: #: CHACHA20 cipher, 00'th IETF draft (implicit POLY1305 authenticator) chacha20draft00Suites = [] chacha20draft00Suites.append(TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_draft_00) + chacha20draft00Suites.append( + TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_draft_00) chacha20draft00Suites.append(TLS_DHE_RSA_WITH_CHACHA20_POLY1305_draft_00) #: CHACHA20 cipher (implicit POLY1305 authenticator, SHA256 PRF) chacha20Suites = [] chacha20Suites.append(TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256) + chacha20Suites.append(TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256) chacha20Suites.append(TLS_DHE_RSA_WITH_CHACHA20_POLY1305_SHA256) chacha20Suites.append(TLS_CHACHA20_POLY1305_SHA256) #: RC4 128 stream cipher rc4Suites = [] rc4Suites.append(TLS_ECDHE_RSA_WITH_RC4_128_SHA) - rc4Suites.append(TLS_ECDHE_ECDSA_WITH_RC4_128_SHA) # unsupported + rc4Suites.append(TLS_ECDHE_ECDSA_WITH_RC4_128_SHA) rc4Suites.append(TLS_ECDH_ECDSA_WITH_RC4_128_SHA) # unsupported rc4Suites.append(TLS_ECDH_RSA_WITH_RC4_128_SHA) # unsupported rc4Suites.append(TLS_DH_ANON_WITH_RC4_128_MD5) @@ -880,7 +893,7 @@ class CipherSuite: nullSuites.append(TLS_RSA_WITH_NULL_MD5) nullSuites.append(TLS_RSA_WITH_NULL_SHA) nullSuites.append(TLS_RSA_WITH_NULL_SHA256) - nullSuites.append(TLS_ECDHE_ECDSA_WITH_NULL_SHA) # unsupported + nullSuites.append(TLS_ECDHE_ECDSA_WITH_NULL_SHA) nullSuites.append(TLS_ECDH_ECDSA_WITH_NULL_SHA) # unsupported nullSuites.append(TLS_ECDH_RSA_WITH_NULL_SHA) # unsupported nullSuites.append(TLS_ECDHE_RSA_WITH_NULL_SHA) @@ -905,11 +918,11 @@ class CipherSuite: shaSuites.append(TLS_DH_ANON_WITH_AES_256_CBC_SHA) shaSuites.append(TLS_DH_ANON_WITH_3DES_EDE_CBC_SHA) shaSuites.append(TLS_RSA_WITH_NULL_SHA) - shaSuites.append(TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA) # unsupported - shaSuites.append(TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA) # unsupported - shaSuites.append(TLS_ECDHE_ECDSA_WITH_3DES_EDE_CBC_SHA) # unsupported - shaSuites.append(TLS_ECDHE_ECDSA_WITH_RC4_128_SHA) # unsupported - shaSuites.append(TLS_ECDHE_ECDSA_WITH_NULL_SHA) # unsupported + shaSuites.append(TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA) + shaSuites.append(TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA) + shaSuites.append(TLS_ECDHE_ECDSA_WITH_3DES_EDE_CBC_SHA) + shaSuites.append(TLS_ECDHE_ECDSA_WITH_RC4_128_SHA) + shaSuites.append(TLS_ECDHE_ECDSA_WITH_NULL_SHA) shaSuites.append(TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA) # unsupported shaSuites.append(TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA) # unsupported shaSuites.append(TLS_ECDH_ECDSA_WITH_3DES_EDE_CBC_SHA) # unsupported @@ -940,14 +953,14 @@ class CipherSuite: sha256Suites.append(TLS_RSA_WITH_NULL_SHA256) sha256Suites.append(TLS_DH_ANON_WITH_AES_128_CBC_SHA256) sha256Suites.append(TLS_DH_ANON_WITH_AES_256_CBC_SHA256) - sha256Suites.append(TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256) # unsupported + sha256Suites.append(TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256) sha256Suites.append(TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA256) # unsupported sha256Suites.append(TLS_ECDH_RSA_WITH_AES_128_CBC_SHA256) # unsupported sha256Suites.append(TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256) #: SHA-384 HMAC, SHA-384 PRF sha384Suites = [] - sha384Suites.append(TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384) # unsupported + sha384Suites.append(TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384) sha384Suites.append(TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA384) # unsupported sha384Suites.append(TLS_ECDH_RSA_WITH_AES_256_CBC_SHA384) # unsupported sha384Suites.append(TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384) @@ -1011,6 +1024,28 @@ def filterForVersion(suites, minVersion, maxVersion): includeSuites.update(CipherSuite.tls13Suites) return [s for s in suites if s in includeSuites] + @staticmethod + def filter_for_certificate(suites, cert_chain): + """Return a copy of suites without ciphers incompatible with the cert. + """ + includeSuites = set([]) + includeSuites.update(CipherSuite.tls13Suites) + if cert_chain: + if cert_chain.x509List[0].certAlg in ("rsa", "rsa-pss"): + includeSuites.update(CipherSuite.certAllSuites) + if cert_chain.x509List[0].certAlg == "rsa-pss": + # suites in which RSA encryption is used can't be used with + # rsa-pss + includeSuites.symmetric_difference_update( + CipherSuite.certSuites) + if cert_chain.x509List[0].certAlg == "ecdsa": + includeSuites.update(CipherSuite.ecdheEcdsaSuites) + else: + includeSuites.update(CipherSuite.srpSuites) + includeSuites.update(CipherSuite.anonSuites) + includeSuites.update(CipherSuite.ecdhAnonSuites) + return [s for s in suites if s in includeSuites] + @staticmethod def _filterSuites(suites, settings, version=None): if version is None: @@ -1059,6 +1094,8 @@ def _filterSuites(suites, settings, version=None): keyExchangeSuites += CipherSuite.dheCertSuites if "ecdhe_rsa" in keyExchangeNames: keyExchangeSuites += CipherSuite.ecdheCertSuites + if "ecdhe_ecdsa" in keyExchangeNames: + keyExchangeSuites += CipherSuite.ecdheEcdsaSuites if "srp_sha" in keyExchangeNames: keyExchangeSuites += CipherSuite.srpSuites if "srp_sha_rsa" in keyExchangeNames: @@ -1167,6 +1204,8 @@ def getEcdheCertSuites(cls, settings, version=None): #: ECDHE key exchange, ECDSA authentication ecdheEcdsaSuites = [] + ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256) + ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_draft_00) ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384) ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256) ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384) @@ -1177,6 +1216,12 @@ def getEcdheCertSuites(cls, settings, version=None): ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_RC4_128_SHA) ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_NULL_SHA) + @classmethod + def getEcdsaSuites(cls, settings, version=None): + """Provide ECDSA authenticated ciphersuites matching settings""" + return cls._filterSuites(CipherSuite.ecdheEcdsaSuites, + settings, version) + #: anon FFDHE key exchange anonSuites = [] anonSuites.append(TLS_DH_ANON_WITH_AES_256_GCM_SHA384) diff --git a/tlslite/handshakesettings.py b/tlslite/handshakesettings.py index 028a8d023..57e9cb25d 100644 --- a/tlslite/handshakesettings.py +++ b/tlslite/handshakesettings.py @@ -21,11 +21,12 @@ # Don't allow "md5" by default MAC_NAMES = ["sha", "sha256", "sha384", "aead"] ALL_MAC_NAMES = MAC_NAMES + ["md5"] -KEY_EXCHANGE_NAMES = ["rsa", "dhe_rsa", "ecdhe_rsa", "srp_sha", "srp_sha_rsa", - "ecdh_anon", "dh_anon"] +KEY_EXCHANGE_NAMES = ["ecdhe_ecdsa", "rsa", "dhe_rsa", "ecdhe_rsa", "srp_sha", + "srp_sha_rsa", "ecdh_anon", "dh_anon"] CIPHER_IMPLEMENTATIONS = ["openssl", "pycrypto", "python"] CERTIFICATE_TYPES = ["x509"] RSA_SIGNATURE_HASHES = ["sha512", "sha384", "sha256", "sha224", "sha1"] +ECDSA_SIGNATURE_HASHES = ["sha512", "sha384", "sha256", "sha224", "sha1"] ALL_RSA_SIGNATURE_HASHES = RSA_SIGNATURE_HASHES + ["md5"] RSA_SCHEMES = ["pss", "pkcs1"] # while secp521r1 is the most secure, it's also much slower than the others @@ -145,6 +146,16 @@ class HandshakeSettings(object): The allowed hashes are: "md5", "sha1", "sha224", "sha256", "sha384" and "sha512". The default list does not include md5. + :vartype ecdsaSigHashes: list + :ivar ecdsaSigHashes: List of hashes supported (and advertised as such) for + TLS 1.2 signatures over Server Key Exchange or Certificate Verify with + ECDSA signature algorithm. + + The list is sorted from most wanted to least wanted algorithm. + + The allowed hashes are: "sha1", "sha224", "sha256", + "sha384" and "sha512". + :vartype eccCurves: list :ivar eccCurves: List of named curves that are to be supported @@ -225,6 +236,10 @@ class HandshakeSettings(object): 1.2 or lower is the highest enabled version. Must not be set to values smaller than 64. Set to None to disable support for the extension. See also: RFC 8449. + + :vartype keyExchangeNames: list + :ivar keyExchangeNames: Enabled key exchange types for the connection, + influences selected cipher suites. """ def _init_key_settings(self): @@ -249,6 +264,7 @@ def _init_misc_extensions(self): self.useExperimentalTackExtension = False self.sendFallbackSCSV = False self.useEncryptThenMAC = True + self.ecdsaSigHashes = list(ECDSA_SIGNATURE_HASHES) self.usePaddingExtension = True self.useExtendedMasterSecret = True self.requireExtendedMasterSecret = False @@ -340,6 +356,12 @@ def _sanityCheckECDHSettings(other): raise ValueError("Key shares for not enabled groups specified: {0}" .format(nonAdvertisedGroup)) + unknownSigHash = not_matching(other.ecdsaSigHashes, + ECDSA_SIGNATURE_HASHES) + if unknownSigHash: + raise ValueError("Unknown ECDSA signature hash: '{0}'".\ + format(unknownSigHash)) + unknownDHGroup = not_matching(other.dhGroups, ALL_DH_GROUP_NAMES) if unknownDHGroup: raise ValueError("Unknown FFDHE group name: '{0}'" @@ -388,7 +410,8 @@ def _sanityCheckPrimitivesNames(other): raise ValueError("Unknown RSA padding mode: '{0}'" .format(unknownRSAPad)) - if not other.rsaSigHashes and other.maxVersion >= (3, 3): + if not other.rsaSigHashes and not other.ecdsaSigHashes and \ + other.maxVersion >= (3, 3): raise ValueError("TLS 1.2 requires signature algorithms to be set") @staticmethod @@ -544,6 +567,7 @@ def _copy_key_settings(self, other): other.certificateTypes = self.certificateTypes other.rsaSigHashes = self.rsaSigHashes other.rsaSchemes = self.rsaSchemes + other.ecdsaSigHashes = self.ecdsaSigHashes # DH key params other.eccCurves = self.eccCurves other.dhParams = self.dhParams diff --git a/unit_tests/test_tlslite_constants.py b/unit_tests/test_tlslite_constants.py index 7480cce6d..913c8663f 100644 --- a/unit_tests/test_tlslite_constants.py +++ b/unit_tests/test_tlslite_constants.py @@ -8,6 +8,12 @@ import unittest2 as unittest except ImportError: import unittest +try: + import mock + from mock import call +except ImportError: + import unittest.mock as mock + from unittest.mock import call from tlslite.handshakesettings import HandshakeSettings from tlslite.constants import CipherSuite, HashAlgorithm, SignatureAlgorithm, \ @@ -157,6 +163,17 @@ def test_filterForVersion_with_TLS_1_3(self): self.assertEqual(filtered, [CipherSuite.TLS_AES_128_GCM_SHA256]) + def test_getEcdsaSuites(self): + hs = HandshakeSettings() + hs.keyExchangeNames = ["ecdhe_ecdsa"] + hs.cipherNames = ["aes128"] + + filtered = CipherSuite.getEcdsaSuites(hs) + + self.assertEqual(filtered, + [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256, + CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA]) + def test_getTLS13Suites(self): hs = HandshakeSettings() hs.maxVersion = (3, 4) @@ -171,6 +188,75 @@ def test_getTLS13Suites_with_TLS1_2(self): self.assertEqual(CipherSuite.getTLS13Suites(hs, (3, 3)), []) + def test_filter_for_certificate_with_no_cert(self): + orig_ciphers = [CipherSuite.TLS_RSA_WITH_3DES_EDE_CBC_SHA, + CipherSuite.TLS_ECDH_ANON_WITH_3DES_EDE_CBC_SHA, + CipherSuite.TLS_AES_128_GCM_SHA256] + + new_ciphers = CipherSuite.filter_for_certificate( + orig_ciphers, + None) + + self.assertFalse(orig_ciphers is new_ciphers) + self.assertEqual(new_ciphers, + [CipherSuite.TLS_ECDH_ANON_WITH_3DES_EDE_CBC_SHA, + CipherSuite.TLS_AES_128_GCM_SHA256]) + + def test_filter_for_certificate_with_rsa(self): + cert_list = mock.MagicMock() + cert = mock.MagicMock() + cert.certAlg = "rsa" + cert_list.x509List = [cert] + + orig_ciphers = [CipherSuite.TLS_RSA_WITH_3DES_EDE_CBC_SHA, + CipherSuite.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA, + CipherSuite.TLS_ECDHE_ECDSA_WITH_3DES_EDE_CBC_SHA, + CipherSuite.TLS_AES_128_GCM_SHA256] + + new_ciphers = CipherSuite.filter_for_certificate( + orig_ciphers, cert_list) + + self.assertEqual(new_ciphers, + [CipherSuite.TLS_RSA_WITH_3DES_EDE_CBC_SHA, + CipherSuite.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA, + CipherSuite.TLS_AES_128_GCM_SHA256]) + + def test_filter_for_certificate_with_rsa_pss(self): + cert_list = mock.MagicMock() + cert = mock.MagicMock() + cert.certAlg = "rsa-pss" + cert_list.x509List = [cert] + + orig_ciphers = [CipherSuite.TLS_RSA_WITH_3DES_EDE_CBC_SHA, + CipherSuite.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA, + CipherSuite.TLS_ECDHE_ECDSA_WITH_3DES_EDE_CBC_SHA, + CipherSuite.TLS_AES_128_GCM_SHA256] + + new_ciphers = CipherSuite.filter_for_certificate( + orig_ciphers, cert_list) + + self.assertEqual(new_ciphers, + [CipherSuite.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA, + CipherSuite.TLS_AES_128_GCM_SHA256]) + + def test_filter_for_certificate_with_ecdsa(self): + cert_list = mock.MagicMock() + cert = mock.MagicMock() + cert.certAlg = "ecdsa" + cert_list.x509List = [cert] + + orig_ciphers = [CipherSuite.TLS_RSA_WITH_3DES_EDE_CBC_SHA, + CipherSuite.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA, + CipherSuite.TLS_ECDHE_ECDSA_WITH_3DES_EDE_CBC_SHA, + CipherSuite.TLS_AES_128_GCM_SHA256] + + new_ciphers = CipherSuite.filter_for_certificate( + orig_ciphers, cert_list) + + self.assertEqual(new_ciphers, + [CipherSuite.TLS_ECDHE_ECDSA_WITH_3DES_EDE_CBC_SHA, + CipherSuite.TLS_AES_128_GCM_SHA256]) + class TestSignatureScheme(unittest.TestCase): def test_toRepr_with_valid_value(self): diff --git a/unit_tests/test_tlslite_handshakesettings.py b/unit_tests/test_tlslite_handshakesettings.py index c1fec402a..995b6818c 100644 --- a/unit_tests/test_tlslite_handshakesettings.py +++ b/unit_tests/test_tlslite_handshakesettings.py @@ -235,15 +235,23 @@ def test_invalid_signature_algorithm(self): def test_no_signature_hashes_set_with_TLS1_2(self): hs = HandshakeSettings() hs.rsaSigHashes = [] + hs.ecdsaSigHashes = [] with self.assertRaises(ValueError): hs.validate() def test_no_signature_hashes_set_with_TLS1_1(self): hs = HandshakeSettings() hs.rsaSigHashes = [] + hs.ecdsaSigHashes = [] hs.maxVersion = (3, 2) self.assertIsNotNone(hs.validate()) + def test_invalid_signature_ecdsa_algorithm(self): + hs = HandshakeSettings() + hs.ecdsaSigHashes += ['md5'] + with self.assertRaises(ValueError): + hs.validate() + def test_invalid_curve_name(self): hs = HandshakeSettings() hs.eccCurves = ['P-256'] From 6b6ba6172e3fa5e4d34e9119d00975fff562fbda Mon Sep 17 00:00:00 2001 From: Hubert Kario Date: Fri, 20 Sep 2019 18:51:33 +0200 Subject: [PATCH 04/10] Client side support for simple ECDSA no support for client authentication with ECDSA for now --- tlslite/tlsconnection.py | 112 ++++++++++++++++++++++++++------------- 1 file changed, 75 insertions(+), 37 deletions(-) diff --git a/tlslite/tlsconnection.py b/tlslite/tlsconnection.py index e3d695882..26ca8e15b 100644 --- a/tlslite/tlsconnection.py +++ b/tlslite/tlsconnection.py @@ -659,6 +659,7 @@ def _clientSendClientHello(self, settings, session, srpUsername, cipherSuites += CipherSuite.getSrpAllSuites(settings) elif certParams: cipherSuites += CipherSuite.getTLS13Suites(settings) + cipherSuites += CipherSuite.getEcdsaSuites(settings) cipherSuites += CipherSuite.getEcdheCertSuites(settings) cipherSuites += CipherSuite.getDheCertSuites(settings) cipherSuites += CipherSuite.getCertSuites(settings) @@ -1259,12 +1260,16 @@ def _clientTLS13Handshake(self, settings, session, clientHello, None, None, None, prfName, b'server') - scheme = SignatureScheme.toRepr(signature_scheme) - padType = SignatureScheme.getPadding(scheme) - hashName = SignatureScheme.getHash(scheme) - saltLen = getattr(hashlib, hashName)().digest_size - publicKey = certificate.cert_chain.getEndEntityPublicKey() + if signature_scheme[1] == SignatureAlgorithm.ecdsa: + padType = None + hashName = HashAlgorithm.toRepr(signature_scheme[0]) + saltLen = None + else: + scheme = SignatureScheme.toRepr(signature_scheme) + padType = SignatureScheme.getPadding(scheme) + hashName = SignatureScheme.getHash(scheme) + saltLen = getattr(hashlib, hashName)().digest_size if not publicKey.verify(certificate_verify.signature, signature_context, @@ -1348,9 +1353,14 @@ def _clientTLS13Handshake(self, settings, session, clientHello, signature_scheme, None, None, None, prfName, b'client') - pad_type = SignatureScheme.getPadding(scheme) - hash_name = SignatureScheme.getHash(scheme) - salt_len = getattr(hashlib, hash_name)().digest_size + if signature_scheme[1] == SignatureAlgorithm.ecdsa: + pad_type = None + hash_name = HashAlgorithm.toRepr(signature_scheme[0]) + salt_len = None + else: + pad_type = SignatureScheme.getPadding(scheme) + hash_name = SignatureScheme.getHash(scheme) + salt_len = getattr(hashlib, hash_name)().digest_size signature = privateKey.sign(signature_context, pad_type, @@ -1533,7 +1543,8 @@ def _clientKeyExchange(self, settings, cipherSuite, keyExchange): """Perform the client side of key exchange""" # if server chose cipher suite with authentication, get the certificate - if cipherSuite in CipherSuite.certAllSuites: + if cipherSuite in CipherSuite.certAllSuites or \ + cipherSuite in CipherSuite.ecdheEcdsaSuites: for result in self._getMsg(ContentType.handshake, HandshakeType.certificate, certificateType): @@ -1568,6 +1579,7 @@ def _clientKeyExchange(self, settings, cipherSuite, #abort if Certificate Request with inappropriate ciphersuite if cipherSuite not in CipherSuite.certAllSuites \ + and cipherSuite not in CipherSuite.ecdheEcdsaSuites \ or cipherSuite in CipherSuite.srpAllSuites: for result in self._sendError(\ AlertDescription.unexpected_message, @@ -3970,7 +3982,8 @@ def _pickServerKeyExchangeSig(settings, clientHello, certList=None, for schemeID in supported: if schemeID in hashAndAlgsExt.sigalgs: name = SignatureScheme.toRepr(schemeID) - if not name and schemeID[1] == SignatureAlgorithm.rsa: + if not name and schemeID[1] in (SignatureAlgorithm.rsa, + SignatureAlgorithm.ecdsa): name = HashAlgorithm.toRepr(schemeID[0]) if name: @@ -3984,41 +3997,66 @@ def _sigHashesToList(settings, privateKey=None, certList=None, version=(3, 3)): """Convert list of valid signature hashes to array of tuples""" certType = None + publicKey = None if certList: certType = certList.x509List[0].certAlg + publicKey = certList.x509List[0].publicKey sigAlgs = [] - for schemeName in settings.rsaSchemes: - # pkcs#1 v1.5 signatures are not allowed in TLS 1.3 - if version > (3, 3) and schemeName == "pkcs1": - continue - for hashName in settings.rsaSigHashes: - # rsa-pss certificates can't be used to make PKCS#1 v1.5 - # signatures - if certType == "rsa-pss" and schemeName == "pkcs1": + if not certType or certType == "ecdsa": + for hashName in settings.ecdsaSigHashes: + # only SHA256, SHA384 and SHA512 are allowed in TLS 1.3 + if version > (3, 3) and hashName in ("sha1", "sha224"): continue - try: - # 1024 bit keys are too small to create valid - # rsa-pss-SHA512 signatures - if schemeName == 'pss' and hashName == 'sha512'\ - and privateKey and privateKey.n < 2**2047: + + # in TLS 1.3 ECDSA key curve is bound to hash + if publicKey and version > (3, 3): + size = len(publicKey) + size, r = divmod(size, 8) + size += int(bool(r)) + if size == 32 and hashName != "sha256": + continue + if size == 48 and hashName != "sha384": + continue + if size == 65 and hashName != "sha512": continue - # advertise support for both rsaEncryption and RSA-PSS OID - # key type - if certType != 'rsa-pss': - sigAlgs.append(getattr(SignatureScheme, - "rsa_{0}_rsae_{1}" - .format(schemeName, hashName))) - if certType != 'rsa': - sigAlgs.append(getattr(SignatureScheme, - "rsa_{0}_pss_{1}" - .format(schemeName, hashName))) - except AttributeError: - if schemeName == 'pkcs1': - sigAlgs.append((getattr(HashAlgorithm, hashName), - SignatureAlgorithm.rsa)) + + sigAlgs.append((getattr(HashAlgorithm, hashName), + SignatureAlgorithm.ecdsa)) + + if not certType or certType in ("rsa", "rsa-pss"): + for schemeName in settings.rsaSchemes: + # pkcs#1 v1.5 signatures are not allowed in TLS 1.3 + if version > (3, 3) and schemeName == "pkcs1": continue + + for hashName in settings.rsaSigHashes: + # rsa-pss certificates can't be used to make PKCS#1 v1.5 + # signatures + if certType == "rsa-pss" and schemeName == "pkcs1": + continue + try: + # 1024 bit keys are too small to create valid + # rsa-pss-SHA512 signatures + if schemeName == 'pss' and hashName == 'sha512'\ + and privateKey and privateKey.n < 2**2047: + continue + # advertise support for both rsaEncryption and RSA-PSS OID + # key type + if certType != 'rsa-pss': + sigAlgs.append(getattr(SignatureScheme, + "rsa_{0}_rsae_{1}" + .format(schemeName, hashName))) + if certType != 'rsa': + sigAlgs.append(getattr(SignatureScheme, + "rsa_{0}_pss_{1}" + .format(schemeName, hashName))) + except AttributeError: + if schemeName == 'pkcs1': + sigAlgs.append((getattr(HashAlgorithm, hashName), + SignatureAlgorithm.rsa)) + continue return sigAlgs @staticmethod From a28437707ce04cae4d3ec60f2b304f4472bda306 Mon Sep 17 00:00:00 2001 From: Hubert Kario Date: Fri, 20 Sep 2019 20:48:13 +0200 Subject: [PATCH 05/10] universal code for parsing private key files allow the parsePEM to parse both RSA and ECDSA key files --- tlslite/utils/python_key.py | 166 ++++++++++++++++++++ tlslite/utils/python_rsakey.py | 85 +--------- unit_tests/test_tlslite_utils_python_key.py | 99 ++++++++++++ 3 files changed, 267 insertions(+), 83 deletions(-) create mode 100644 tlslite/utils/python_key.py create mode 100644 unit_tests/test_tlslite_utils_python_key.py diff --git a/tlslite/utils/python_key.py b/tlslite/utils/python_key.py new file mode 100644 index 000000000..c8a7e7045 --- /dev/null +++ b/tlslite/utils/python_key.py @@ -0,0 +1,166 @@ + + +from .python_rsakey import Python_RSAKey +from .python_ecdsakey import Python_ECDSAKey +from .pem import dePem, pemSniff +from .asn1parser import ASN1Parser +from .cryptomath import bytesToNumber +from ecdsa.curves import NIST256p, NIST384p, NIST521p +from ecdsa.keys import SigningKey, VerifyingKey + +class Python_Key(object): + """ + Generic methods for parsing private keys from files. + + Handles both RSA and ECDSA keys, irrespective of file format. + """ + + @staticmethod + def parsePEM(s, passwordCallback=None): + """Parse a string containing a PEM-encoded .""" + + if pemSniff(s, "PRIVATE KEY"): + bytes = dePem(s, "PRIVATE KEY") + return Python_Key._parse_pkcs8(bytes) + elif pemSniff(s, "RSA PRIVATE KEY"): + bytes = dePem(s, "RSA PRIVATE KEY") + return Python_Key._parse_ssleay(bytes) + elif pemSniff(s, "EC PRIVATE KEY"): + bytes = dePem(s, "EC PRIVATE KEY") + return Python_Key._parse_ecc_ssleay(bytes) + else: + raise SyntaxError("Not a PEM private key file") + + @staticmethod + def _parse_pkcs8(bytes): + parser = ASN1Parser(bytes) + + # first element in PrivateKeyInfo is an INTEGER + version = parser.getChild(0).value + if bytesToNumber(version) != 0: + raise SyntaxError("Unrecognized PKCS8 version") + + # second element in PrivateKeyInfo is a SEQUENCE of type + # AlgorithmIdentifier + alg_ident = parser.getChild(1) + seq_len = alg_ident.getChildCount() + # first item of AlgorithmIdentifier is an OBJECT (OID) + oid = alg_ident.getChild(0) + if list(oid.value) == [42, 134, 72, 134, 247, 13, 1, 1, 1]: + key_type = "rsa" + elif list(oid.value) == [42, 134, 72, 134, 247, 13, 1, 1, 10]: + key_type = "rsa-pss" + elif list(oid.value) == [42, 134, 72, 206, 61, 2, 1]: + key_type = "ecdsa" + else: + raise SyntaxError("Unrecognized AlgorithmIdentifier: {0}" + .format(list(oid.value))) + # second item of AlgorithmIdentifier are parameters (defined by + # above algorithm) + if key_type == "rsa": + if seq_len != 2: + raise SyntaxError("Missing parameters for RSA algorithm ID") + parameters = alg_ident.getChild(1) + if parameters.value != bytearray(0): + raise SyntaxError("RSA parameters are not NULL") + elif key_type == "ecdsa": + if seq_len != 2: + raise SyntaxError("Invalid encoding of algorithm identifier") + curveID = alg_ident.getChild(1) + if list(curveID.value) == [42, 134, 72, 206, 61, 3, 1, 7]: + curve = NIST256p + elif list(curveID.value) == [43, 129, 4, 0, 34]: + curve = NIST384p + elif list(curveID.value) == [43, 129, 4, 0, 35]: + curve = NIST521p + else: + raise SyntaxError("Unknown curve") + else: # rsa-pss + pass # ignore parameters - don't apply restrictions + + if seq_len > 2: + raise SyntaxError("Invalid encoding of AlgorithmIdentifier") + + #Get the privateKey + private_key_parser = parser.getChild(2) + + #Adjust for OCTET STRING encapsulation + private_key_parser = ASN1Parser(private_key_parser.value) + + if key_type == "ecdsa": + return Python_Key._parse_ecdsa_private_key(private_key_parser, + curve) + else: + return Python_Key._parse_asn1_private_key(private_key_parser, + key_type) + + @staticmethod + def _parse_ssleay(data): + """ + Parse binary structure of the old SSLeay file format used by OpenSSL. + + For RSA keys. + """ + private_key_parser = ASN1Parser(data) + # "rsa" type as old format doesn't support rsa-pss parameters + return Python_Key._parse_asn1_private_key(private_key_parser, "rsa") + + @staticmethod + def _parse_ecc_ssleay(data): + """ + Parse binary structure of the old SSLeay file fromat used by OpenSSL. + + For ECDSA keys. + """ + private_key = SigningKey.from_der(data) + secret_mult = private_key.privkey.secret_multiplier + return Python_ECDSAKey(None, None, private_key.curve.name, + secret_mult) + + @staticmethod + def _parse_ecdsa_private_key(private, curve): + ver = private.getChild(0) + if ver.value != b'\x01': + raise SyntaxError("Unexpected EC key version") + private_key = private.getChild(1) + public_key = private.getChild(2) + # first two bytes are the ASN.1 custom type and the length of payload + # while the latter two bytes are just specification of the public + # key encoding (uncompressed) + # TODO: update ecdsa lib to be able to parse PKCS#8 files + if curve is not NIST521p: + if list(public_key.value[:1]) != [3] or \ + list(public_key.value[2:4]) != [0, 4]: + raise SyntaxError("Invalid or unsupported encoding of public key") + + pub_key = VerifyingKey.from_string(public_key.value[4:], + curve) + else: + if list(public_key.value[:3]) != [3, 129, 134] or \ + list(public_key.value[3:5]) != [0, 4]: + raise SyntaxError("Invalid or unsupported encoding of public key") + + pub_key = VerifyingKey.from_string(public_key.value[5:], + curve) + pub_x = pub_key.pubkey.point.x() + pub_y = pub_key.pubkey.point.y() + priv_key = SigningKey.from_string(private_key.value, + curve) + mult = priv_key.privkey.secret_multiplier + return Python_ECDSAKey(pub_x, pub_y, curve.name, mult) + + @staticmethod + def _parse_asn1_private_key(private_key_parser, key_type): + version = private_key_parser.getChild(0).value[0] + if version != 0: + raise SyntaxError("Unrecognized RSAPrivateKey version") + n = bytesToNumber(private_key_parser.getChild(1).value) + e = bytesToNumber(private_key_parser.getChild(2).value) + d = bytesToNumber(private_key_parser.getChild(3).value) + p = bytesToNumber(private_key_parser.getChild(4).value) + q = bytesToNumber(private_key_parser.getChild(5).value) + dP = bytesToNumber(private_key_parser.getChild(6).value) + dQ = bytesToNumber(private_key_parser.getChild(7).value) + qInv = bytesToNumber(private_key_parser.getChild(8).value) + return Python_RSAKey(n, e, d, p, q, dP, dQ, qInv, key_type) + diff --git a/tlslite/utils/python_rsakey.py b/tlslite/utils/python_rsakey.py index 2ba2a33bd..5925615dd 100644 --- a/tlslite/utils/python_rsakey.py +++ b/tlslite/utils/python_rsakey.py @@ -118,86 +118,5 @@ def generate(bits, key_type="rsa"): @deprecated_params({"data": "s", "password_callback": "passwordCallback"}) def parsePEM(data, password_callback=None): """Parse a string containing a PEM-encoded .""" - del password_callback - if pemSniff(data, "PRIVATE KEY"): - data = dePem(data, "PRIVATE KEY") - return Python_RSAKey._parse_pkcs8(data) - elif pemSniff(data, "RSA PRIVATE KEY"): - data = dePem(data, "RSA PRIVATE KEY") - return Python_RSAKey._parse_ssleay(data) - else: - raise SyntaxError("Not a PEM private key file") - - @staticmethod - def _parse_pkcs8(data): - """Parse data in the binary PKCS#8 format.""" - parser = ASN1Parser(data) - - # first element in PrivateKeyInfo is an INTEGER - version = parser.getChild(0).value - if bytesToNumber(version) != 0: - raise SyntaxError("Unrecognized PKCS8 version") - - # second element in PrivateKeyInfo is a SEQUENCE of type - # AlgorithmIdentifier - alg_ident = parser.getChild(1) - seqLen = alg_ident.getChildCount() - # first item of AlgorithmIdentifier is an OBJECT (OID) - oid = alg_ident.getChild(0) - if list(oid.value) == [42, 134, 72, 134, 247, 13, 1, 1, 1]: - key_type = "rsa" - elif list(oid.value) == [42, 134, 72, 134, 247, 13, 1, 1, 10]: - key_type = "rsa-pss" - else: - raise SyntaxError("Unrecognized AlgorithmIdentifier: {0}" - .format(list(oid.value))) - # second item of AlgorithmIdentifier are parameters (defined by - # above algorithm) - if key_type == "rsa": - if seqLen != 2: - raise SyntaxError("Missing parameters for RSA algorithm ID") - parameters = alg_ident.getChild(1) - if parameters.value != bytearray(0): - raise SyntaxError("RSA parameters are not NULL") - else: # rsa-pss - pass # ignore parameters - don't apply restrictions - - if seqLen > 2: - raise SyntaxError("Invalid encoding of AlgorithmIdentifier") - - #Get the privateKey - private_key_parser = parser.getChild(2) - - #Adjust for OCTET STRING encapsulation - private_key_parser = ASN1Parser(private_key_parser.value) - - return Python_RSAKey._parse_asn1_private_key(private_key_parser, - key_type) - - @staticmethod - def _parse_ssleay(data): - """ - Parse binary structure of the old SSLeay file format used by OpenSSL. - """ - private_key_parser = ASN1Parser(data) - - # "rsa" type as old format doesn't support rsa-pss parameters - return Python_RSAKey._parse_asn1_private_key(private_key_parser, "rsa") - - # n, e, d, etc. are standar names for those values, keep them - # pylint: disable=invalid-name - @staticmethod - def _parse_asn1_private_key(private_key_parser, key_type): - version = private_key_parser.getChild(0).value[0] - if version != 0: - raise SyntaxError("Unrecognized RSAPrivateKey version") - n = bytesToNumber(private_key_parser.getChild(1).value) - e = bytesToNumber(private_key_parser.getChild(2).value) - d = bytesToNumber(private_key_parser.getChild(3).value) - p = bytesToNumber(private_key_parser.getChild(4).value) - q = bytesToNumber(private_key_parser.getChild(5).value) - dP = bytesToNumber(private_key_parser.getChild(6).value) - dQ = bytesToNumber(private_key_parser.getChild(7).value) - qInv = bytesToNumber(private_key_parser.getChild(8).value) - return Python_RSAKey(n, e, d, p, q, dP, dQ, qInv, key_type) - # pylint: enable=invalid-name + from .python_key import Python_Key + return Python_Key.parsePEM(data, password_callback) diff --git a/unit_tests/test_tlslite_utils_python_key.py b/unit_tests/test_tlslite_utils_python_key.py new file mode 100644 index 000000000..e165c6164 --- /dev/null +++ b/unit_tests/test_tlslite_utils_python_key.py @@ -0,0 +1,99 @@ + +try: + import unittest2 as unittest +except ImportError: + import unittest + +try: + import mock + from mock import call +except ImportError: + import unittest.mock as mock + from unittest.mock import call + +from tlslite.utils.python_key import Python_Key +from tlslite.utils.python_rsakey import Python_RSAKey +from tlslite.utils.python_ecdsakey import Python_ECDSAKey + +class TestKey(unittest.TestCase): + def test_rsa_key(self): + key = ( + "-----BEGIN PRIVATE KEY-----\n" + "MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDmM/tmq/2/N0Yy\n" + "qBb2Bu2X3+zdAU+6lnUsz+pNN69nM20aNPi9QH4ekyxs/x+uj37Zlw2hDwA5pifA\n" + "44kFtOXkw1ex4lS1xxtWCuGu1YXTDgxS1I1dcNa4qdYvwlgBGyx2T0GdIDeG3sWN\n" + "WOx2OXBeW6wm0RRxqUhyI9SXaN8yQWsCaajPZxw89OJ1XaSShWrfD2xq6xmQYV6Y\n" + "P6KcIORlQG21a6BvDbewE3iM2OGAmlEv/a7OydKlerWrc3oFBBoOmgnbLuBaxkBQ\n" + "Nv0UQrRBfB+QOoD3cE3seAjkfkVV22llJ5cMn00YwFRdsRYlgNFkoS3k25YsTMb+\n" + "NqlGYnR1AgMBAAECggEAUCu6Wj9716RAZlPz6yrug/4QV8elJK5RkJG4X7wM8jwO\n" + "uxnHpuFXCv7mce9H8Vs4Kj9ZF8ZJpcof/iVACyS9C7acS+8u4T++XXDcuC7UtHQo\n" + "BpDPysMJhLZhSbC9RWVZTrq7dyVJMUdUNa3KbEIEyFfU1I/sNsll2Zpw52o2kSFe\n" + "Ip1TGcnVmFu0uKxPrlNLSSNOVQqz2fOYWBJLk98gk54HAkHpFk92FVorn17seAfS\n" + "ksF70B9X6MBUa6PDSgQfKCwGd27KBpTivx6d8QVtMNqrFq/cqZ7TwWDIq1atZ0aF\n" + "3mYXfXR0toRyYZEXaa14Ao7iCUt5D8d2IG9u3q88AQKBgQD5x6kiqO+ApY7V0S0g\n" + "SyaIdTBjYc9Rbb0qvgy0Mhq68Ekc2fBIdTLc+G9ajkVFIe5blZc0nvwgSLdRfWrJ\n" + "bFpX8SS9Aelgp0mcfXgfIpJmLrPijtgEipTCh/73GTJM3ZnHI1z6xrRP0hi1ww2Q\n" + "Z8oqF34H6glXfYHfMqy9VaGQ4QKBgQDr74T4BxiXK4dIQ0T4oRE/881dScrVz9Ok\n" + "3wPINa5bIvfqHPl3eAJgRYBkxjKRyxt29wvGtWBQvCTHvFgF9F+3v3mfXJPRHaZZ\n" + "e1VJn9Eqjz1KuArIOwSrmnCFrd9jim10Qo36AFU0myridllN/NQn4l7yYgnw2a1/\n" + "WbLYq2nSFQKBgAkJWyog2IFb+/3qUmqfrWY0byq5SCnXAYgBVi5SvbrTpKGBlPra\n" + "Gpv59PVevkzQ/HGdyNmjgtWcK92r3ugonmAeHkkkP5A6nSQnOehOdONzfxiMOG55\n" + "oQYkq2m/JJ25Sq30rpF4DN/yZuh0hRIbXyoErY+VvP7IUKGFkNBMv8qhAoGBANDV\n" + "pLPJzClanRcIfA86ukMKMPfm7kQM/gAMapOXeGow7JHr7aCiuC+wtTH+ARrtVbUa\n" + "fPD48HTl5ARroNo8cVD6idPWJPzPKsQ/l8FgVcs/GHh/qQOMwdiHDhw1R+sax0FF\n" + "+9eS3dh/lBj5uph+NufKxlHzF2t5sclsgxKnvzX1AoGAZlNZt2xn3q/kusUXLovS\n" + "WN8C3ty06qLbD99kiWqEC2gSXc94rk7K7R/1XgfxXV8uOA9eUPDBpchd9PUnhwBE\n" + "tnkuQZ0fZ1P6EpNTumeL/UvIaA2UFtqrzxxJPJQExPRqX5foT6FhXVtGrNGKw78C\n" + "Ft7IqSkjX742rx0ephmvZgE=\n" + "-----END PRIVATE KEY-----") + + parsed_key = Python_Key.parsePEM(key) + + self.assertIsInstance(parsed_key, Python_RSAKey) + + exp_n = int("29060443439214279856616714317441381282994349643640084870" + "42194472422505198384747878467307665661184232728624861572" + "46118030874616185167217887082030330066913757629456433183" + "57727014263595982166729996386221650476766003639153689499" + "85761113451052281630236293941677142748838601564606627814" + "78871504321887555454323655057925411605057705083616507918" + "02130319371355483088627276339169052633563469569700890323" + "45345689545843561543977465544801728579255200638380126710" + "78271693450544506178122783381759966742683127796190767251" + "31801425088592558384516012482302720815493207137857605058" + "06980478584101642143302393556465736571436454903701271051" + "7") + + self.assertEqual(parsed_key.n, exp_n) + + def test_ecdsa_key_pkcs8(self): + key = ( + "-----BEGIN PRIVATE KEY-----\n" + "MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgCOZr0Ovs0eCmh+XM\n" + "QWDYVpsQ+sJdjiq/itp/kYnWNSahRANCAATINGMQAl7cXlPrYzJluGOgmc8sYvae\n" + "tO2EsXKYG6lnYhudZiepVYORP8vqLyxCF/bMIuuVKOPWSfsRGo/H8pnK\n" + "-----END PRIVATE KEY-----\n") + + parsed_key = Python_Key.parsePEM(key) + + self.assertIsInstance(parsed_key, Python_ECDSAKey) + self.assertEqual(parsed_key.private_key.privkey.secret_multiplier, + int("40256217329389834316473379676481509423" + "54978248437138490984956489316429083942")) + self.assertIsNotNone(parsed_key.public_key) + + def test_ecdsa_key_ssleay(self): + key = ( + "-----BEGIN EC PRIVATE KEY-----\n" + "MHcCAQEEIAjma9Dr7NHgpoflzEFg2FabEPrCXY4qv4raf5GJ1jUmoAoGCCqGSM49\n" + "AwEHoUQDQgAEyDRjEAJe3F5T62MyZbhjoJnPLGL2nrTthLFymBupZ2IbnWYnqVWD\n" + "kT/L6i8sQhf2zCLrlSjj1kn7ERqPx/KZyg==\n" + "-----END EC PRIVATE KEY-----\n") + + parsed_key = Python_Key.parsePEM(key) + + self.assertIsInstance(parsed_key, Python_ECDSAKey) + self.assertEqual(parsed_key.private_key.privkey.secret_multiplier, + int("40256217329389834316473379676481509423" + "54978248437138490984956489316429083942")) + self.assertIsNotNone(parsed_key.public_key) From 7ca8d8a3646569b234eab1373d2252ccefea1add Mon Sep 17 00:00:00 2001 From: Hubert Kario Date: Mon, 23 Sep 2019 19:32:48 +0200 Subject: [PATCH 06/10] fixup verifying ECDSA signatures in SKE --- tlslite/messages.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tlslite/messages.py b/tlslite/messages.py index 9df8e7320..712e84fe8 100644 --- a/tlslite/messages.py +++ b/tlslite/messages.py @@ -1455,6 +1455,7 @@ def createECDH(self, curve_type, named_curve=None, point=None): self.curve_type = curve_type self.named_curve = named_curve self.ecdh_Ys = point + return self def parse(self, parser): """ From 8dbf84e14be312b58f2de5611fbb764ebefdd298 Mon Sep 17 00:00:00 2001 From: Hubert Kario Date: Mon, 23 Sep 2019 19:33:27 +0200 Subject: [PATCH 07/10] fixup parsing of ECDSA keys from X.509 certificates --- tlslite/x509.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tlslite/x509.py b/tlslite/x509.py index 18bb0cbac..bbe56a5ee 100644 --- a/tlslite/x509.py +++ b/tlslite/x509.py @@ -28,8 +28,8 @@ class X509(object): :ivar subject: The DER-encoded ASN.1 subject distinguished name. :vartype certAlg: str - :ivar certAlg: algorithm of the public key, "rsa" for RSASSA-PKCS#1 v1.5 - and "rsa-pss" for RSASSA-PSS + :ivar certAlg: algorithm of the public key, "rsa" for RSASSA-PKCS#1 v1.5, + "rsa-pss" for RSASSA-PSS, "ecdsa" for ECDSA """ def __init__(self): From d83f58284369d55881556431ac8e1571720123bc Mon Sep 17 00:00:00 2001 From: Hubert Kario Date: Mon, 23 Sep 2019 19:33:49 +0200 Subject: [PATCH 08/10] fixup parsing of ECDSA keys from X.509 certificates --- tlslite/utils/python_ecdsakey.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tlslite/utils/python_ecdsakey.py b/tlslite/utils/python_ecdsakey.py index 8a0e0b7fc..54aa2ac06 100644 --- a/tlslite/utils/python_ecdsakey.py +++ b/tlslite/utils/python_ecdsakey.py @@ -31,6 +31,7 @@ class Python_ECDSAKey(ECDSAKey): def __init__(self, x, y, curve_name, secret_multiplier=None): if not curve_name: raise ValueError("curve_name must be specified") + self.curve_name = curve_name for c in curves: if c.name == curve_name or c.openssl_name == curve_name: From e4a2292f4d39014071cc92389a12db41a446470f Mon Sep 17 00:00:00 2001 From: Hubert Kario Date: Mon, 23 Sep 2019 19:35:35 +0200 Subject: [PATCH 09/10] signing SKE with ECDSA --- tlslite/keyexchange.py | 94 ++++++++++++++++++++------ unit_tests/test_tlslite_keyexchange.py | 81 ++++++++++++++++++++++ 2 files changed, 153 insertions(+), 22 deletions(-) diff --git a/tlslite/keyexchange.py b/tlslite/keyexchange.py index 2af841849..6bbae602a 100644 --- a/tlslite/keyexchange.py +++ b/tlslite/keyexchange.py @@ -73,6 +73,32 @@ def processServerKeyExchange(self, srvPublicKey, """Process the server KEX and return premaster secret""" raise NotImplementedError() + def _tls12_sign_ecdsa_SKE(self, serverKeyExchange, sigHash=None): + try: + serverKeyExchange.hashAlg, serverKeyExchange.signAlg = \ + getattr(SignatureScheme, sigHash) + hashName = SignatureScheme.getHash(sigHash) + except AttributeError: + serverKeyExchange.hashAlg = getattr(HashAlgorithm, sigHash) + serverKeyExchange.signAlg = SignatureAlgorithm.ecdsa + hashName = sigHash + + hash_bytes = serverKeyExchange.hash(self.clientHello.random, + self.serverHello.random) + + hash_bytes = hash_bytes[:self.privateKey.private_key.curve.baselen] + + serverKeyExchange.signature = \ + self.privateKey.sign(hash_bytes, hashAlg=hashName) + + if not serverKeyExchange.signature: + raise TLSInternalError("Empty signature") + + if not self.privateKey.verify(serverKeyExchange.signature, + hash_bytes, + ecdsa.util.sigdecode_der): + raise TLSInternalError("signature validation failure") + def _tls12_signSKE(self, serverKeyExchange, sigHash=None): """Sign a TLSv1.2 SKE message.""" try: @@ -119,6 +145,8 @@ def signServerKeyExchange(self, serverKeyExchange, sigHash=None): :param sigHash: name of the signature hash to be used for signing """ if self.serverHello.server_version < (3, 3): + if self.privateKey.key_type == "ecdsa": + serverKeyExchange.signAlg = SignatureAlgorithm.ecdsa hashBytes = serverKeyExchange.hash(self.clientHello.random, self.serverHello.random) @@ -131,7 +159,10 @@ def signServerKeyExchange(self, serverKeyExchange, sigHash=None): hashBytes): raise TLSInternalError("Server Key Exchange signature invalid") else: - self._tls12_signSKE(serverKeyExchange, sigHash) + if self.privateKey.key_type == "ecdsa": + self._tls12_sign_ecdsa_SKE(serverKeyExchange, sigHash) + else: + self._tls12_signSKE(serverKeyExchange, sigHash) @staticmethod def _tls12_verify_ecdsa_SKE(serverKeyExchange, publicKey, clientRandom, @@ -225,7 +256,7 @@ def verifyServerKeyExchange(serverKeyExchange, publicKey, clientRandom, @staticmethod def calcVerifyBytes(version, handshakeHashes, signatureAlg, premasterSecret, clientRandom, serverRandom, - prf_name = None, peer_tag=b'client'): + prf_name = None, peer_tag=b'client', key_type="rsa"): """Calculate signed bytes for Certificate Verify""" if version == (3, 0): masterSecret = calcMasterSecret(version, @@ -235,15 +266,23 @@ def calcVerifyBytes(version, handshakeHashes, signatureAlg, serverRandom) verifyBytes = handshakeHashes.digestSSL(masterSecret, b"") elif version in ((3, 1), (3, 2)): - verifyBytes = handshakeHashes.digest() + if key_type != "ecdsa": + verifyBytes = handshakeHashes.digest() + else: + verifyBytes = handshakeHashes.digest("sha1") elif version == (3, 3): - scheme = SignatureScheme.toRepr(signatureAlg) - if scheme is None: - hashName = HashAlgorithm.toRepr(signatureAlg[0]) - padding = 'pkcs1' + if signatureAlg[1] != SignatureAlgorithm.ecdsa: + scheme = SignatureScheme.toRepr(signatureAlg) + if scheme is None: + hashName = HashAlgorithm.toRepr(signatureAlg[0]) + padding = 'pkcs1' + else: + hashName = SignatureScheme.getHash(scheme) + padding = SignatureScheme.getPadding(scheme) else: - hashName = SignatureScheme.getHash(scheme) - padding = SignatureScheme.getPadding(scheme) + scheme = "ecdsa" + padding = None + hashName = HashAlgorithm.toRepr(signatureAlg[0]) verifyBytes = handshakeHashes.digest(hashName) if padding == 'pkcs1': verifyBytes = RSAKey.addPKCS1Prefix(verifyBytes, hashName) @@ -262,7 +301,7 @@ def calcVerifyBytes(version, handshakeHashes, signatureAlg, handshakeHashes.digest(prf_name) verifyBytes = secureHash(verifyBytes, hash_name) else: - raise ValueError("Unsupported TLS version {}".format(version)) + raise ValueError("Unsupported TLS version {0}".format(version)) return verifyBytes @staticmethod @@ -284,6 +323,8 @@ def makeCertificateVerify(version, handshakeHashes, validSigAlgs, SSLv3 """ signatureAlgorithm = None + if privateKey.key_type == "ecdsa" and version < (3, 3): + signatureAlgorithm = (HashAlgorithm.sha1, SignatureAlgorithm.ecdsa) # in TLS 1.2 we must decide which algorithm to use for signing if version == (3, 3): serverSigAlgs = certificateRequest.supported_signature_algs @@ -295,19 +336,27 @@ def makeCertificateVerify(version, handshakeHashes, validSigAlgs, signatureAlgorithm, premasterSecret, clientRandom, - serverRandom) - scheme = SignatureScheme.toRepr(signatureAlgorithm) - # for pkcs1 signatures hash is used to add PKCS#1 prefix, but - # that was already done by calcVerifyBytes - hashName = None - saltLen = 0 - if scheme is None: - padding = 'pkcs1' + serverRandom, + privateKey.key_type) + if signatureAlgorithm and \ + signatureAlgorithm[1] == SignatureAlgorithm.ecdsa: + padding = None + hashName = HashAlgorithm.toRepr(signatureAlgorithm[0]) + saltLen = None + verifyBytes = verifyBytes[:privateKey.private_key.curve.baselen] else: - padding = SignatureScheme.getPadding(scheme) - if padding == 'pss': - hashName = SignatureScheme.getHash(scheme) - saltLen = getattr(hashlib, hashName)().digest_size + scheme = SignatureScheme.toRepr(signatureAlgorithm) + # for pkcs1 signatures hash is used to add PKCS#1 prefix, but + # that was already done by calcVerifyBytes + hashName = None + saltLen = 0 + if scheme is None: + padding = 'pkcs1' + else: + padding = SignatureScheme.getPadding(scheme) + if padding == 'pss': + hashName = SignatureScheme.getHash(scheme) + saltLen = getattr(hashlib, hashName)().digest_size signedBytes = privateKey.sign(verifyBytes, padding, @@ -321,6 +370,7 @@ def makeCertificateVerify(version, handshakeHashes, validSigAlgs, return certificateVerify + class AuthenticatedKeyExchange(KeyExchange): """ Common methods for key exchanges that authenticate Server Key Exchange diff --git a/unit_tests/test_tlslite_keyexchange.py b/unit_tests/test_tlslite_keyexchange.py index 25d28f208..03fa1e5e2 100644 --- a/unit_tests/test_tlslite_keyexchange.py +++ b/unit_tests/test_tlslite_keyexchange.py @@ -84,6 +84,27 @@ "-----END CERTIFICATE-----\n"\ ) +srv_raw_ec_key = str( + "-----BEGIN PRIVATE KEY-----\n" + "MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgCOZr0Ovs0eCmh+XM\n" + "QWDYVpsQ+sJdjiq/itp/kYnWNSahRANCAATINGMQAl7cXlPrYzJluGOgmc8sYvae\n" + "tO2EsXKYG6lnYhudZiepVYORP8vqLyxCF/bMIuuVKOPWSfsRGo/H8pnK\n" + "-----END PRIVATE KEY-----\n" + ) + +srv_raw_ec_certificate = str( + "-----BEGIN CERTIFICATE-----\n" + "MIIBbTCCARSgAwIBAgIJAPM58cskyK+yMAkGByqGSM49BAEwFDESMBAGA1UEAwwJ\n" + "bG9jYWxob3N0MB4XDTE3MTAyMzExNDI0MVoXDTE3MTEyMjExNDI0MVowFDESMBAG\n" + "A1UEAwwJbG9jYWxob3N0MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEyDRjEAJe\n" + "3F5T62MyZbhjoJnPLGL2nrTthLFymBupZ2IbnWYnqVWDkT/L6i8sQhf2zCLrlSjj\n" + "1kn7ERqPx/KZyqNQME4wHQYDVR0OBBYEFPfFTUg9o3t6ehLsschSnC8Te8oaMB8G\n" + "A1UdIwQYMBaAFPfFTUg9o3t6ehLsschSnC8Te8oaMAwGA1UdEwQFMAMBAf8wCQYH\n" + "KoZIzj0EAQNIADBFAiA6p0YM5ZzfW+klHPRU2r13/IfKgeRfDR3dtBngmPvxUgIh\n" + "APTeSDeJvYWVBLzyrKTeSerNDKKHU2Rt7sufipv76+7s\n" + "-----END CERTIFICATE-----\n" + ) + class TestKeyExchange(unittest.TestCase): expected_sha1_SKE = bytearray( @@ -107,6 +128,20 @@ class TestKeyExchange(unittest.TestCase): b'\n3\x9d5\x14\x05\x06nA\xa0\x19\xd5\xaa\x9d\xd1\x16\xb3\xb9\xae' b'\xd1\xe4\xc04\xc1h\xc3\xf5/\xb2\xf6P\r\x1b"\xe9\xc9\x84&\xe1Z') + expected_tls1_1_ecdsa_SKE = bytearray( + b'\x0c\x00\x00P\x03\x00\x17\x03\x04\xff\xfa\x00G0E\x02!\x00\xc6' + b'\xa5\x83\xab\x13\xb83"P\xdcl\x817\xcbS\xab\xebxo\x91K@\x19\xe0' + b'#\xfe,M\xd7R\'\xb0\x02 <\xd6\x03\xdd\x1fS\x12o\xaaa\x9e\x7f\xf1' + b')\x93\xa9cr\xa1\xb3\xa7\r\xdb\xbbV\xb2\xac\xf6ZJ\xe3\x0e' + ) + expected_tls1_2_ecdsa_SKE = bytearray( + b'\x0c\x00\x00R\x03\x00\x17\x03\x04\xff\xfa\x02\x03\x00G0E\x02!' + b'\x00\xc6\xa5\x83\xab\x13\xb83"P\xdcl\x817\xcbS\xab\xebxo\x91K@' + b'\x19\xe0#\xfe,M\xd7R\'\xb0\x02 <\xd6\x03\xdd\x1fS\x12o\xaaa\x9e' + b'\x7f\xf1)\x93\xa9cr\xa1\xb3\xa7\r\xdb\xbbV\xb2\xac\xf6ZJ\xe3\x0e' + ) + +class TestKeyExchangeBasics(TestKeyExchange): def test___init__(self): keyExchange = KeyExchange(0, None, None, None) @@ -172,6 +207,52 @@ def test_signServerKeyExchange_in_TLS1_1(self): self.assertEqual(server_key_exchange.write(), self.expected_tls1_1_SKE) + def test_signServerKeyExchange_with_sha1_ecdsa_in_TLS1_2(self): + srv_private_key = parsePEMKey(srv_raw_ec_key, private=True) + client_hello = ClientHello() + cipher_suite = CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA + server_hello = ServerHello().create((3, 3), + bytearray(32), + bytearray(0), + cipher_suite) + keyExchange = KeyExchange(cipher_suite, + client_hello, + server_hello, + srv_private_key) + + server_key_exchange = ServerKeyExchange(cipher_suite, (3, 3))\ + .createECDH(ECCurveType.named_curve, + GroupName.secp256r1, + bytearray(b'\x04\xff\xfa')) + + keyExchange.signServerKeyExchange(server_key_exchange, 'sha1') + + self.maxDiff = None + self.assertEqual(server_key_exchange.write(), + self.expected_tls1_2_ecdsa_SKE) + + def test_signServerKeyExchange_in_TLS1_1_with_ecdsa(self): + srv_private_key = parsePEMKey(srv_raw_ec_key, private=True) + client_hello = ClientHello() + cipher_suite = CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA + server_hello = ServerHello().create((3, 2), + bytearray(32), + bytearray(0), + cipher_suite) + keyExchange = KeyExchange(cipher_suite, + client_hello, + server_hello, + srv_private_key) + server_key_exchange = ServerKeyExchange(cipher_suite, (3, 2))\ + .createECDH(ECCurveType.named_curve, + GroupName.secp256r1, + bytearray(b'\x04\xff\xfa')) + + keyExchange.signServerKeyExchange(server_key_exchange) + + self.maxDiff = None + self.assertEqual(bytearray(server_key_exchange.write()), + self.expected_tls1_1_ecdsa_SKE) def test_signServerKeyExchange_in_TLS1_1_signature_invalid(self): srv_private_key = parsePEMKey(srv_raw_key, private=True) From 2087e8041725e1b4c29074ec6d81c36a8fc95f54 Mon Sep 17 00:00:00 2001 From: Hubert Kario Date: Mon, 23 Sep 2019 19:36:07 +0200 Subject: [PATCH 10/10] Server side support for simple ECDSA support for ECDSA authentication without client certificates --- tests/serverECCert.pem | 10 +++ tests/serverECKey.pem | 5 ++ tests/tlstest.py | 163 ++++++++++++++++++++++++++++++++++- tlslite/handshakesettings.py | 6 ++ tlslite/tlsconnection.py | 151 ++++++++++++++++++++++++-------- 5 files changed, 296 insertions(+), 39 deletions(-) create mode 100644 tests/serverECCert.pem create mode 100644 tests/serverECKey.pem diff --git a/tests/serverECCert.pem b/tests/serverECCert.pem new file mode 100644 index 000000000..c2cad2db7 --- /dev/null +++ b/tests/serverECCert.pem @@ -0,0 +1,10 @@ +-----BEGIN CERTIFICATE----- +MIIBbTCCARSgAwIBAgIJAPM58cskyK+yMAkGByqGSM49BAEwFDESMBAGA1UEAwwJ +bG9jYWxob3N0MB4XDTE3MTAyMzExNDI0MVoXDTE3MTEyMjExNDI0MVowFDESMBAG +A1UEAwwJbG9jYWxob3N0MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEyDRjEAJe +3F5T62MyZbhjoJnPLGL2nrTthLFymBupZ2IbnWYnqVWDkT/L6i8sQhf2zCLrlSjj +1kn7ERqPx/KZyqNQME4wHQYDVR0OBBYEFPfFTUg9o3t6ehLsschSnC8Te8oaMB8G +A1UdIwQYMBaAFPfFTUg9o3t6ehLsschSnC8Te8oaMAwGA1UdEwQFMAMBAf8wCQYH +KoZIzj0EAQNIADBFAiA6p0YM5ZzfW+klHPRU2r13/IfKgeRfDR3dtBngmPvxUgIh +APTeSDeJvYWVBLzyrKTeSerNDKKHU2Rt7sufipv76+7s +-----END CERTIFICATE----- diff --git a/tests/serverECKey.pem b/tests/serverECKey.pem new file mode 100644 index 000000000..a891745c0 --- /dev/null +++ b/tests/serverECKey.pem @@ -0,0 +1,5 @@ +-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgCOZr0Ovs0eCmh+XM +QWDYVpsQ+sJdjiq/itp/kYnWNSahRANCAATINGMQAl7cXlPrYzJluGOgmc8sYvae +tO2EsXKYG6lnYhudZiepVYORP8vqLyxCF/bMIuuVKOPWSfsRGo/H8pnK +-----END PRIVATE KEY----- diff --git a/tests/tlstest.py b/tests/tlstest.py index 2207a1925..b0b82ea22 100755 --- a/tests/tlstest.py +++ b/tests/tlstest.py @@ -215,6 +215,83 @@ def connect(): test_no += 1 + print("Test {0} - good X.509 ECDSA, SSLv3".format(test_no)) + synchro.recv(1) + connection = connect() + settings = HandshakeSettings() + settings.minVersion = (3, 0) + settings.maxVersion = (3, 0) + connection.handshakeClientCert(settings=settings) + testConnClient(connection) + assert connection.session.cipherSuite in\ + constants.CipherSuite.ecdheEcdsaSuites + assert isinstance(connection.session.serverCertChain, X509CertChain) + connection.close() + + test_no += 1 + + print("Test {0} - good X.509 ECDSA, TLSv1.0".format(test_no)) + synchro.recv(1) + connection = connect() + settings = HandshakeSettings() + settings.minVersion = (3, 1) + settings.maxVersion = (3, 1) + connection.handshakeClientCert(settings=settings) + testConnClient(connection) + assert connection.session.cipherSuite in\ + constants.CipherSuite.ecdheEcdsaSuites + assert isinstance(connection.session.serverCertChain, X509CertChain) + connection.close() + + test_no += 1 + + print("Test {0} - good X.509 ECDSA, TLSv1.2".format(test_no)) + synchro.recv(1) + connection = connect() + settings = HandshakeSettings() + settings.minVersion = (3, 3) + settings.maxVersion = (3, 3) + connection.handshakeClientCert(settings=settings) + testConnClient(connection) + assert connection.session.cipherSuite in\ + constants.CipherSuite.ecdheEcdsaSuites + assert isinstance(connection.session.serverCertChain, X509CertChain) + connection.close() + + test_no += 1 + + print("Test {0} - mismatched ECDSA curve, TLSv1.2".format(test_no)) + synchro.recv(1) + connection = connect() + settings = HandshakeSettings() + settings.minVersion = (3, 3) + settings.maxVersion = (3, 3) + settings.eccCurves = ["secp384r1"] + settings.keyShares = [] + try: + connection.handshakeClientCert(settings=settings) + assert False + except TLSLocalAlert as e: + assert "certificate with curve" in str(e) + connection.close() + + test_no += 1 + + print("Test {0} - good X.509 ECDSA, TLSv1.3".format(test_no)) + synchro.recv(1) + connection = connect() + settings = HandshakeSettings() + settings.minVersion = (3, 4) + settings.maxVersion = (3, 4) + connection.handshakeClientCert(settings=settings) + testConnClient(connection) + assert connection.session.cipherSuite in\ + constants.CipherSuite.tls13Suites + assert isinstance(connection.session.serverCertChain, X509CertChain) + connection.close() + + test_no += 1 + print("Test {0} - good X.509, mismatched key_share".format(test_no)) synchro.recv(1) connection = connect() @@ -1069,10 +1146,11 @@ def connect(): s.settimeout(15) return TLSConnection(s) - x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read()) + with open(os.path.join(dir, "serverX509Cert.pem")) as f: + x509Cert = X509().parse(f.read()) x509Chain = X509CertChain([x509Cert]) - s = open(os.path.join(dir, "serverX509Key.pem")).read() - x509Key = parsePEMKey(s, private=True) + with open(os.path.join(dir, "serverX509Key.pem")) as f: + x509Key = parsePEMKey(f.read(), private=True) with open(os.path.join(dir, "serverRSAPSSSigCert.pem")) as f: x509CertRSAPSSSig = X509().parse(f.read()) @@ -1088,6 +1166,14 @@ def connect(): x509KeyRSAPSS = parsePEMKey(f.read(), private=True, implementations=["python"]) + with open(os.path.join(dir, "serverECCert.pem")) as f: + x509CertECDSA = X509().parse(f.read()) + x509ecdsaChain = X509CertChain([x509CertECDSA]) + assert x509CertECDSA.certAlg == "ecdsa" + with open(os.path.join(dir, "serverECKey.pem")) as f: + x509ecdsaKey = parsePEMKey(f.read(), private=True, + implementations=["python"]) + test_no = 0 print("Test {0} - Anonymous server handshake".format(test_no)) @@ -1187,6 +1273,77 @@ def connect(): test_no += 1 + print("Test {0} - good X.509 ECDSA, SSLv3".format(test_no)) + synchro.send(b'R') + connection = connect() + settings = HandshakeSettings() + settings.minVersion = (3, 0) + settings.maxVersion = (3, 0) + connection.handshakeServer(certChain=x509ecdsaChain, + privateKey=x509ecdsaKey, settings=settings) + assert not connection.extendedMasterSecret + testConnServer(connection) + connection.close() + + test_no += 1 + + print("Test {0} - good X.509 ECDSA, TLSv1.0".format(test_no)) + synchro.send(b'R') + connection = connect() + settings = HandshakeSettings() + settings.minVersion = (3, 1) + settings.maxVersion = (3, 1) + connection.handshakeServer(certChain=x509ecdsaChain, + privateKey=x509ecdsaKey, settings=settings) + assert connection.extendedMasterSecret + testConnServer(connection) + connection.close() + + test_no += 1 + + print("Test {0} - good X.509 ECDSA, TLSv1.2".format(test_no)) + synchro.send(b'R') + connection = connect() + settings = HandshakeSettings() + settings.minVersion = (3, 3) + settings.maxVersion = (3, 3) + connection.handshakeServer(certChain=x509ecdsaChain, + privateKey=x509ecdsaKey, settings=settings) + assert connection.extendedMasterSecret + testConnServer(connection) + connection.close() + + test_no += 1 + + print("Test {0} - mismatched ECDSA curve, TLSv1.2".format(test_no)) + synchro.send(b'R') + connection = connect() + settings = HandshakeSettings() + settings.minVersion = (3, 3) + settings.maxVersion = (3, 3) + try: + connection.handshakeServer(certChain=x509ecdsaChain, + privateKey=x509ecdsaKey, settings=settings) + except TLSRemoteAlert as e: + assert "handshake_failure" in str(e) + connection.close() + + test_no += 1 + + print("Test {0} - good X.509 ECDSA, TLSv1.3".format(test_no)) + synchro.send(b'R') + connection = connect() + settings = HandshakeSettings() + settings.minVersion = (3, 4) + settings.maxVersion = (3, 4) + connection.handshakeServer(certChain=x509ecdsaChain, + privateKey=x509ecdsaKey, settings=settings) + assert connection.extendedMasterSecret + testConnServer(connection) + connection.close() + + test_no += 1 + print("Test {0} - good X.509, mismatched key_share".format(test_no)) synchro.send(b'R') connection = connect() diff --git a/tlslite/handshakesettings.py b/tlslite/handshakesettings.py index 57e9cb25d..2c0c501a5 100644 --- a/tlslite/handshakesettings.py +++ b/tlslite/handshakesettings.py @@ -38,6 +38,12 @@ ALL_CURVE_NAMES += ["secp224r1", "secp192r1"] ALL_DH_GROUP_NAMES = ["ffdhe2048", "ffdhe3072", "ffdhe4096", "ffdhe6144", "ffdhe8192"] +CURVE_ALIASES = {"secp256r1": ('NIST256p', 'prime256v1', 'P-256'), + "secp384r1": ('NIST384p', 'P-384'), + "secp521r1": ('NIST521p', 'P-521'), + "secp256k1": ('SECP256k1',), + "secp192r1": ('NIST192p', 'P-192'), + "secp224r1": ('NIST224p', 'P-224')} KNOWN_VERSIONS = ((3, 0), (3, 1), (3, 2), (3, 3), (3, 4)) TICKET_CIPHERS = ["chacha20-poly1305", "aes256gcm", "aes128gcm"] PSK_MODES = ["psk_dhe_ke", "psk_ke"] diff --git a/tlslite/tlsconnection.py b/tlslite/tlsconnection.py index 26ca8e15b..daec60933 100644 --- a/tlslite/tlsconnection.py +++ b/tlslite/tlsconnection.py @@ -29,7 +29,7 @@ from .errors import * from .messages import * from .mathtls import * -from .handshakesettings import HandshakeSettings, KNOWN_VERSIONS +from .handshakesettings import HandshakeSettings, KNOWN_VERSIONS, CURVE_ALIASES from .handshakehashes import HandshakeHashes from .utils.tackwrapper import * from .utils.deprecations import deprecated_params @@ -38,6 +38,8 @@ AECDHKeyExchange, FFDHKeyExchange, ECDHKeyExchange from .handshakehelpers import HandshakeHelpers from .utils.cipherfactory import createAESGCM, createCHACHA20 +from ecdsa.util import sigdecode_der +from ecdsa.keys import BadSignatureError class TLSConnection(TLSRecordLayer): """ @@ -1596,7 +1598,8 @@ def _clientKeyExchange(self, settings, cipherSuite, serverCertChain = None publicKey = None - if cipherSuite in CipherSuite.certAllSuites: + if cipherSuite in CipherSuite.certAllSuites or \ + cipherSuite in CipherSuite.ecdheEcdsaSuites: # get the certificate for result in self._clientGetKeyFromChain(serverCertificate, settings, @@ -1630,7 +1633,8 @@ def _clientKeyExchange(self, settings, cipherSuite, if serverKeyExchange: # store key exchange metadata for user applications if self.version >= (3, 3) \ - and cipherSuite in CipherSuite.certAllSuites \ + and (cipherSuite in CipherSuite.certAllSuites or + cipherSuite in CipherSuite.ecdheEcdsaSuites) \ and cipherSuite not in CipherSuite.certSuites: self.serverSigAlg = (serverKeyExchange.hashAlg, serverKeyExchange.signAlg) @@ -1769,15 +1773,56 @@ def _clientGetKeyFromChain(self, certificate, settings, tackExt=None): #Get and check public key from the cert chain publicKey = cert_chain.getEndEntityPublicKey() - if len(publicKey) < settings.minKeySize: - for result in self._sendError(AlertDescription.handshake_failure, - "Other party's public key too small: %d" % len(publicKey)): - yield result - if len(publicKey) > settings.maxKeySize: - for result in self._sendError(AlertDescription.handshake_failure, - "Other party's public key too large: %d" % len(publicKey)): - yield result - + cert_type = cert_chain.x509List[0].certAlg + if cert_type == "ecdsa": + curve_name = publicKey.curve_name + for name, aliases in CURVE_ALIASES.items(): + if curve_name in aliases: + curve_name = name + break + + if self.version <= (3, 3) and curve_name not in settings.eccCurves: + for result in self._sendError( + AlertDescription.handshake_failure, + "Peer sent certificate with curve we did not " + "advertise support for: {0}".format(curve_name)): + yield result + if self.version >= (3, 4): + if curve_name not in ('secp256r1', 'secp384r1', 'secp521r1'): + for result in self._sendError( + AlertDescription.illegal_parameter, + "Peer sent certificate with curve not supported " + "in TLS 1.3: {0}".format(curve_name)): + yield result + if curve_name == 'secp256r1': + sig_alg_for_curve = 'sha256' + elif curve_name == 'secp384r1': + sig_alg_for_curve = 'sha384' + else: + assert curve_name == 'secp521r1' + sig_alg_for_curve = 'sha512' + if sig_alg_for_curve not in settings.ecdsaSigHashes: + for result in self._sendError( + AlertDescription.illegal_parameter, + "Peer selected certificate with ECDSA curve we " + "did not advertise support for: {0}" + .format(curve_name)): + yield result + else: + # for RSA keys + if len(publicKey) < settings.minKeySize: + for result in self._sendError( + AlertDescription.handshake_failure, + "Other party's public key too small: %d" % + len(publicKey)): + yield result + if len(publicKey) > settings.maxKeySize: + for result in self._sendError( + AlertDescription.handshake_failure, + "Other party's public key too large: %d" % + len(publicKey)): + yield result + # If there's no TLS Extension, look for a TACK cert if tackpyLoaded: if not tackExt: @@ -2009,7 +2054,8 @@ def _handshakeServerAsyncHelper(self, verifierDB, nextProtos = None # If not doing a certificate-based suite, discard the TACK - if not cipherSuite in CipherSuite.certAllSuites: + if not cipherSuite in CipherSuite.certAllSuites and \ + not cipherSuite in CipherSuite.ecdheEcdsaSuites: tacks = None # Prepare a TACK Extension if requested @@ -2124,7 +2170,8 @@ def _handshakeServerAsyncHelper(self, verifierDB, # Perform a certificate-based key exchange elif (cipherSuite in CipherSuite.certSuites or cipherSuite in CipherSuite.dheCertSuites or - cipherSuite in CipherSuite.ecdheCertSuites): + cipherSuite in CipherSuite.ecdheCertSuites or + cipherSuite in CipherSuite.ecdheEcdsaSuites): if cipherSuite in CipherSuite.certSuites: keyExchange = RSAKeyExchange(cipherSuite, clientHello, @@ -2138,7 +2185,8 @@ def _handshakeServerAsyncHelper(self, verifierDB, privateKey, settings.dhParams, dhGroups) - elif cipherSuite in CipherSuite.ecdheCertSuites: + elif cipherSuite in CipherSuite.ecdheCertSuites or \ + cipherSuite in CipherSuite.ecdheEcdsaSuites: acceptedCurves = self._curveNamesToList(settings) defaultCurve = getattr(GroupName, settings.defaultCurve) keyExchange = ECDHE_RSAKeyExchange(cipherSuite, @@ -2191,7 +2239,8 @@ def _handshakeServerAsyncHelper(self, verifierDB, #Create the session object self.session = Session() - if cipherSuite in CipherSuite.certAllSuites: + if cipherSuite in CipherSuite.certAllSuites or \ + cipherSuite in CipherSuite.ecdheEcdsaSuites: serverCertChain = cert_chain else: serverCertChain = None @@ -2548,22 +2597,32 @@ def _serverTLS13Handshake(self, settings, clientHello, cipherSuite, signature_scheme, None, None, None, prf_name, b'server') - padType = SignatureScheme.getPadding(scheme) - hashName = SignatureScheme.getHash(scheme) - saltLen = getattr(hashlib, hashName)().digest_size - - signature = privateKey.sign(signature_context, - padType, - hashName, - saltLen) - if not privateKey.verify(signature, signature_context, - padType, - hashName, - saltLen): - for result in self._sendError( - AlertDescription.internal_error, - "Certificate Verify signature failed"): - yield result + if signature_scheme[1] == SignatureAlgorithm.ecdsa: + hashName = HashAlgorithm.toRepr(signature_scheme[0]) + signature = privateKey.sign(signature_context, hashAlg=hashName) + if not privateKey.verify(signature, signature_context, + hashAlg=hashName): + for result in self._sendError( + AlertDescription.internal_error, + "Certificate Verify signature failed"): + yield result + else: + padType = SignatureScheme.getPadding(scheme) + hashName = SignatureScheme.getHash(scheme) + saltLen = getattr(hashlib, hashName)().digest_size + + signature = privateKey.sign(signature_context, + padType, + hashName, + saltLen) + if not privateKey.verify(signature, signature_context, + padType, + hashName, + saltLen): + for result in self._sendError( + AlertDescription.internal_error, + "Certificate Verify signature failed"): + yield result certificate_verify.create(signature, signature_scheme) self._queue_message(certificate_verify) @@ -3124,6 +3183,7 @@ def _serverGetClientHello(self, settings, cert_chain, verifierDB, cipherSuites += CipherSuite.getTLS13Suites(settings, version) if ecGroupIntersect: + cipherSuites += CipherSuite.getEcdsaSuites(settings, version) cipherSuites += CipherSuite.getEcdheCertSuites(settings, version) if ffGroupIntersect: @@ -3142,6 +3202,8 @@ def _serverGetClientHello(self, settings, cert_chain, verifierDB, cipherSuites = CipherSuite.filterForVersion(cipherSuites, minVersion=version, maxVersion=version) + cipherSuites = CipherSuite.filter_for_certificate(cipherSuites, + cert_chain) #If resumption was requested and we have a session cache... if clientHello.session_id and sessionCache: session = None @@ -3339,8 +3401,10 @@ def _serverGetClientHello(self, settings, cert_chain, verifierDB, yield result #If an RSA suite is chosen, check for certificate type intersection - if cipherSuite in CipherSuite.certAllSuites and CertificateType.x509 \ - not in clientHello.certificate_types: + if (cipherSuite in CipherSuite.certAllSuites or + cipherSuite in CipherSuite.ecdheEcdsaSuites) \ + and CertificateType.x509 \ + not in clientHello.certificate_types: for result in self._sendError(\ AlertDescription.handshake_failure, "the client doesn't support my certificate type"): @@ -3561,9 +3625,13 @@ def _serverSRPKeyExchange(self, clientHello, serverHello, verifierDB, try: serverKeyExchange = keyExchange.makeServerKeyExchange(sigHash) except TLSUnknownPSKIdentity: - for result in self._sendError(\ + for result in self._sendError( AlertDescription.unknown_psk_identity): yield result + except TLSInsufficientSecurity: + for result in self._sendError( + AlertDescription.insufficient_security): + yield result #Send ServerHello[, Certificate], ServerKeyExchange, #ServerHelloDone @@ -3618,7 +3686,18 @@ def _serverCertKeyExchange(self, clientHello, serverHello, AlertDescription.handshake_failure, str(alert)): yield result - serverKeyExchange = keyExchange.makeServerKeyExchange(sigHashAlg) + try: + serverKeyExchange = keyExchange.makeServerKeyExchange(sigHashAlg) + except TLSInternalError as alert: + for result in self._sendError( + AlertDescription.internal_error, + str(alert)): + yield result + except TLSInsufficientSecurity as alert: + for result in self._sendError( + AlertDescription.insufficient_security, + str(alert)): + yield result if serverKeyExchange is not None: msgs.append(serverKeyExchange) if reqCert: