diff --git a/tlslite/utils/__init__.py b/tlslite/utils/__init__.py index 4d49df64..953af7b9 100644 --- a/tlslite/utils/__init__.py +++ b/tlslite/utils/__init__.py @@ -22,6 +22,7 @@ "python_aes", "python_rc4", "python_rsakey", + "python_dsakey", "rc4", "rijndael", "rsakey", diff --git a/tlslite/utils/dsakey.py b/tlslite/utils/dsakey.py new file mode 100644 index 00000000..fb4c62dd --- /dev/null +++ b/tlslite/utils/dsakey.py @@ -0,0 +1,33 @@ +"""Abstract class for DSA.""" + +class DSAKey(object): + """This is an abstract base class for DSA keys. + + Particular implementations of DSA keys, such as + :py:class:`~.python_dsakey.Python_DSAKey` + ... more coming + inherit from this. + + To create or parse an DSA key, don't use one of these classes + directly. Instead, use the factory functions in + :py:class:`~tlslite.utils.keyfactory`. + """ + + def __init__(self, p, q, g, x, y): + raise NotImplementedError() + + def __len__(self): + raise NotImplementedError() + + def hasPrivateKey(self): + raise NotImplementedError() + + def hashAndSign(self, data, hAlg): + raise NotImplementedError() + + def verify(self, signature, hash_bytes): + raise NotImplementedError() + + @staticmethod + def generate(L, N): + raise NotImplementedError() diff --git a/tlslite/utils/keyfactory.py b/tlslite/utils/keyfactory.py index 9b18b538..c70e7453 100644 --- a/tlslite/utils/keyfactory.py +++ b/tlslite/utils/keyfactory.py @@ -8,6 +8,7 @@ from .rsakey import RSAKey from .python_rsakey import Python_RSAKey from .python_ecdsakey import Python_ECDSAKey +from .python_dsakey import Python_DSAKey from tlslite.utils import cryptomath if cryptomath.m2cryptoLoaded: @@ -233,3 +234,28 @@ def _create_public_ecdsa_key(point_x, point_y, curve_name, if impl == "python": return Python_ECDSAKey(point_x, point_y, curve_name) raise ValueError("No acceptable implementation") + +def _create_public_dsa_key(p, q, g, y, + implementations=("python",)): + """ + Convert public key parameters into concrete implementation of verifier. + + The public key in DSA consists of four integers. + + :type p: int + :param p: domain parameter, prime num defining Gaolis Field + :type q: int + :param q: domain parameter, prime factor of p-1 + :type g: int + :param g: domain parameter, generator of q-order cyclic group GP(p) + :type y: int + :param y: public key + :type implementations: iterable of str + :param implementations: list of implementations that can be used as the + concrete implementation of the verifying key (only 'python' is + supported currently) + """ + for impl in implementations: + if impl == "python": + return Python_DSAKey(p=p, q=q, g=g, y=y) + raise ValueError("No acceptable implementation") diff --git a/tlslite/utils/python_dsakey.py b/tlslite/utils/python_dsakey.py new file mode 100644 index 00000000..40dff5a6 --- /dev/null +++ b/tlslite/utils/python_dsakey.py @@ -0,0 +1,87 @@ +from ecdsa.der import encode_sequence, encode_integer, \ +remove_sequence, remove_integer + +from .cryptomath import getRandomNumber, getRandomPrime, \ +powMod, numBits, bytesToNumber, invMod, secureHash + +from .dsakey import DSAKey + +class Python_DSAKey(DSAKey): + + def __init__(self, p=0, q=0, g=0, x=0, y=0): + self.p = p + self.q = q + self.g = g + self.private_key = x + self.public_key = y + self.key_type = "dsa" + + def __len__(self): + return numBits(self.p) + + def hasPrivateKey(self): + return bool(self.private_key) + + @staticmethod + def generate(L, N): + assert (L, N) in [(1024, 160), (2048, 224), (2048, 256), (3072, 256)] + key = Python_DSAKey() + (q, p) = Python_DSAKey.generate_qp(L, N) + + index = getRandomNumber(1, (p-1)) + g = powMod(index, int((p-1)/q), p) + x = getRandomNumber(1, q-1) + y = powMod(g, x, p) + key.q = q + key.p = p + key.g = g + key.private_key = x + key.public_key = y + return key + + @staticmethod + def generate_qp(L, N): + assert (L, N) in [(1024, 160), (2048, 224), (2048, 256), (3072, 256)] + + # TODO: optimize, docstring, implement Shawe-Taylor Random_Prime + q = int(getRandomPrime(N)) + while True: + p = int(getRandomPrime(L)) + if (p-1) % q: + break + return (q, p) + + def hashAndSign(self, data, hAlg="sha1"): + # TODO: add assert for hash size < (q-1) constrain, docstring + digest = bytesToNumber(secureHash(bytearray(data), hAlg)) + digest_size = numBits(digest) + N = numBits(self.q) + if N < digest_size: + digest &= ~(~0 << (digest_size - N)) + k = getRandomNumber(1, (self.q-1)) + r = powMod(self.g, k, self.p) % self.q + s = invMod(k, self.q) * (digest + self.private_key * r) % self.q + return encode_sequence(encode_integer(r), encode_integer(s)) + + def hashAndVerify(self, signature, data, hAlg="sha1"): + # Get r, s components from signature + digest = bytesToNumber(secureHash(bytearray(data), hAlg)) + digest_size = numBits(digest) + N = numBits(self.q) + if N < digest_size: + digest &= ~(~0 << (digest_size - N)) + + # get r, s keys + body, rest = remove_sequence(signature) + assert bytesToNumber(rest) == 0 + r, rest = remove_integer(body) + s, rest = remove_integer(rest) + assert bytesToNumber(rest) == 0 + + # check the signature + if 0 < r < self.q and 0 < s < self.q: + w = invMod(s, self.q) + u1 = (digest * w) % self.q + u2 = (r * w) % self.q + v = ((powMod(self.g, u1, self.p) * powMod(self.public_key, u2, self.p)) % self.p) % self.q + return r == v diff --git a/tlslite/utils/python_key.py b/tlslite/utils/python_key.py index 9ed48128..cf57505e 100644 --- a/tlslite/utils/python_key.py +++ b/tlslite/utils/python_key.py @@ -2,6 +2,7 @@ from .python_rsakey import Python_RSAKey from .python_ecdsakey import Python_ECDSAKey +from .python_dsakey import Python_DSAKey from .pem import dePem, pemSniff from .asn1parser import ASN1Parser from .cryptomath import bytesToNumber @@ -25,7 +26,10 @@ def parsePEM(s, passwordCallback=None): return Python_Key._parse_pkcs8(bytes) elif pemSniff(s, "RSA PRIVATE KEY"): bytes = dePem(s, "RSA PRIVATE KEY") - return Python_Key._parse_ssleay(bytes) + return Python_Key._parse_ssleay(bytes, "rsa") + elif pemSniff(s, "DSA PRIVATE KEY"): + bytes = dePem(s, "DSA PRIVATE KEY") + return Python_Key._parse_dsa_ssleay(bytes) elif pemSniff(s, "EC PRIVATE KEY"): bytes = dePem(s, "EC PRIVATE KEY") return Python_Key._parse_ecc_ssleay(bytes) @@ -51,6 +55,8 @@ def _parse_pkcs8(bytes): key_type = "rsa" elif list(oid.value) == [42, 134, 72, 134, 247, 13, 1, 1, 10]: key_type = "rsa-pss" + elif list(oid.value) == [42, 134, 72, 206, 56, 4, 1]: + key_type = "dsa" elif list(oid.value) == [42, 134, 72, 206, 61, 2, 1]: key_type = "ecdsa" else: @@ -64,6 +70,8 @@ def _parse_pkcs8(bytes): parameters = alg_ident.getChild(1) if parameters.value != bytearray(0): raise SyntaxError("RSA parameters are not NULL") + if key_type == "dsa": + parameters = alg_ident.getChild(1) elif key_type == "ecdsa": if seq_len != 2: raise SyntaxError("Invalid encoding of algorithm identifier") @@ -91,12 +99,14 @@ def _parse_pkcs8(bytes): if key_type == "ecdsa": return Python_Key._parse_ecdsa_private_key(private_key_parser, curve) + elif key_type == "dsa": + return Python_Key._parse_dsa_private_key(private_key_parser) else: return Python_Key._parse_asn1_private_key(private_key_parser, key_type) @staticmethod - def _parse_ssleay(data): + def _parse_ssleay(data, key_type="rsa"): """ Parse binary structure of the old SSLeay file format used by OpenSSL. @@ -104,7 +114,17 @@ def _parse_ssleay(data): """ private_key_parser = ASN1Parser(data) # "rsa" type as old format doesn't support rsa-pss parameters - return Python_Key._parse_asn1_private_key(private_key_parser, "rsa") + return Python_Key._parse_asn1_private_key(private_key_parser, key_type) + + @staticmethod + def _parse_dsa_ssleay(data): + """ + Parse binary structure of the old SSLeay file format used by OpenSSL. + + For DSA keys. + """ + private_key_parser = ASN1Parser(data) + return Python_Key._parse_dsa_private_key(private_key_parser) @staticmethod def _parse_ecc_ssleay(data): @@ -167,3 +187,13 @@ def _parse_asn1_private_key(private_key_parser, key_type): qInv = bytesToNumber(private_key_parser.getChild(8).value) return Python_RSAKey(n, e, d, p, q, dP, dQ, qInv, key_type) + + @staticmethod + def _parse_dsa_private_key(private_key_parser): + p = bytesToNumber(private_key_parser.getChild(1).value) + q = bytesToNumber(private_key_parser.getChild(2).value) + g = bytesToNumber(private_key_parser.getChild(3).value) + y = bytesToNumber(private_key_parser.getChild(4).value) + x = bytesToNumber(private_key_parser.getChild(5).value) + return Python_DSAKey(p, q, g, x, y) + diff --git a/tlslite/x509.py b/tlslite/x509.py index f860aac9..2fbfd48f 100644 --- a/tlslite/x509.py +++ b/tlslite/x509.py @@ -1,4 +1,4 @@ -# Authors: +# Authors: # Trevor Perrin # Google - parsing subject field # @@ -114,12 +114,14 @@ def parseBinary(self, bytes): self.certAlg = "rsa" elif list(alg_oid) == [42, 134, 72, 134, 247, 13, 1, 1, 10]: self.certAlg = "rsa-pss" + elif list(alg_oid) == [42, 134, 72, 206, 56, 4, 1]: + self.certAlg = "dsa" elif list(alg_oid) == [42, 134, 72, 206, 61, 2, 1]: self.certAlg = "ecdsa" else: raise SyntaxError("Unrecognized AlgorithmIdentifier") - # for RSA the parameters of AlgorithmIdentifier should be a NULL + # for RSA the parameters of AlgorithmIdentifier shuld be a NULL if self.certAlg == "rsa": if alg_identifier_len != 2: raise SyntaxError("Missing parameters in AlgorithmIdentifier") @@ -131,6 +133,10 @@ def parseBinary(self, bytes): self._ecdsa_pubkey_parsing( tbs_certificate.getChildBytes(subject_public_key_info_index)) return + elif self.certAlg == "dsa": + self._ecdsa_pubkey_parsing( + tbs_certificate.getChildBytes(subject_public_key_info_index)) + return else: # rsa-pss pass # ignore parameters, if any - don't apply key restrictions @@ -189,6 +195,41 @@ def _ecdsa_pubkey_parsing(self, subject_public_key_info): curve_name = public_key.curve.name self.publicKey = _create_public_ecdsa_key(x, y, curve_name) + def _dsa_pubkey_parsing(self, subject_public_key_info): + + # Get the subjectPublicKey + subject_public_key = subject_public_key_info.getChild(1) + self.subject_public_key = subject_public_key_info.getChildBytes(1) + self.subject_public_key = ASN1Parser(self.subject_public_key).value[1:] + + # Adjust for BIT STRING encapsulation + if subject_public_key.value[0]: + raise SyntaxError() + subject_public_key = ASN1Parser(subject_public_key.value[1:]) + + # Get the {A, p, q} + A = subject_public_key.getChild(0) + p = subject_public_key.getChild(1) + g = subject_public_key.getChild(2) + + # q member is in the third element of subject_public_key_info + subject_public_key = subject_public_key_info.getChild(2) + # Adjust for BIT STRING encapsulation + if subject_public_key.value[0]: + raise SyntaxError() + subject_public_key = ASN1Parser(subject_public_key.value[1:]) + # Get the {q} + q = subject_public_key.getChild(0) + + # Decode them into numbers + A = bytesToNumber(A.value) + p = bytesToNumber(p.value) + q = bytesToNumber(q.value) + g = bytesToNumber(g.value) + + # Create a public key instance + self.publicKey = _createPublicDSAKey(p, q, g, A) + def getFingerprint(self): """ Get the hex-encoded fingerprint of this certificate. diff --git a/unit_tests/test_tlslite_utils_python_dsakey.py b/unit_tests/test_tlslite_utils_python_dsakey.py new file mode 100644 index 00000000..11cb8a4a --- /dev/null +++ b/unit_tests/test_tlslite_utils_python_dsakey.py @@ -0,0 +1,116 @@ + +try: + import unittest2 as unittest +except ImportError: + import unittest + +try: + import mock + from mock import call +except ImportError: + import unittest.mock as mock + from unittest.mock import call + +from tlslite.utils.python_key import Python_Key +from tlslite.utils.python_dsakey import Python_DSAKey + +class TestDSAKey(unittest.TestCase): + @classmethod + def setUpClass(cls): + # TODO: probably change the size for faster testing + cls.key = Python_DSAKey(p=178011905478542266528237562450159990145232156369120674273274450314442865788737020770612695252123463079567156784778466449970650770920727857050009668388144034129745221171818506047231150039301079959358067395348717066319802262019714966524135060945913707594956514672855690606794135837542707371727429551343320695239, + q=864205495604807476120572616017955259175325408501, + g=174068207532402095185811980123523436538604490794561350978495831040599953488455823147851597408940950725307797094915759492368300574252438761037084473467180148876118103083043754985190983472601550494691329488083395492313850000361646482644608492304078721818959999056496097769368017749273708962006689187956744210730, + x=815960938335567045826535774426310948213969419547, + y=93112034322951472596117660754477105936016239881518384608983697589807815085172881818932620239891701018896357840324098451512424272385133659978648415057546632838052276464784739551442289054190343076593830408219991636629995759970263893544572968012605598088385656419695456948585138640681647789701298363638658702717) + + # sha1 signature of message 'some message to sign' + cls.sha1_sig = \ + bytearray(b'0E\x02!\x00\xf7Q\x97.\xcfv\x03\xf0\xff,^\xb9' + b'\nZ\xbd\x0e\xaaf\xf2]\xe0\xb0\x91\xa6cY\xa9\xff' + b'{@\x18\xc8\x02 <\x80\x1a\xfa\x14\xd2\\\x02\xfe' + b'\x1a\xb7\x07X\xba\xd8`\xd4\x1d\xa9\x9cm\xc7\xcd' + b'\x11\xbb\x1b\xd1A\xcdO\xa2?') + + def test_parse_from_pem(self): + # TODO: change the bit size from 2048 to smaller? + key = ( + "-----BEGIN DSA PRIVATE KEY-----\n" + "MIIBvQIBAAKBgQD9f1OBHXUSKVLfSpwu7OTn9hG3UjzvRADDHj+AtlEmaUVdQCJR\n" + "+1k9jVj6v8X1ujD2y5tVbNeBO4AdNG/yZmC3a5lQpaSfn+gEexAiwk+7qdf+t8Yb\n" + "+DtX58aophUPBPuD9tPFHsMCNVQTWhaRMvZ1864rYdcq7/IiAxmd0UgBxwIVAJdg\n" + "UI8VIwvMspK5gqLrhAvwWBz1AoGBAPfhoIXWmz3ey7yrXDa4V7l5lK+7+jrqgvlX\n" + "TAs9B4JnUVlXjrrUWU/mcQcQgYC0SRZxI+hMKBYTt88JMozIpuE8FnqLVHyNKOCj\n" + "rh4rs6Z1kW6jfwv6ITVi8ftiegEkO8yk8b6oUZCJqIPf4VrlnwaSi2ZegHtVJWQB\n" + "TDv+z0kqAoGBAISYj2hobCdcblJ3nDyVJ99CLlWJIkm3kZBpBsA0RQ2q2sKRWM7L\n" + "HgLUgKgX02slDEp2LkJg0H1ccuzUbphcFRP3iqY88mK74R9evDM2tani3NCA/ZLK\n" + "GI58gJqqNmdoDd6nKIrz0NarBinR0j3UTfXncRdkCwVLFa88FhmXGR19AhUAjuz0\n" + "M8NFitlfzW/FpFUWV7hYfRs=\n" + "-----END DSA PRIVATE KEY-----") + + parsed_key = Python_Key.parsePEM(key) + self.assertIsInstance(parsed_key, Python_DSAKey) + self.assertTrue(parsed_key.hasPrivateKey()) + self.assertEqual(parsed_key.private_key, self.key.private_key) + self.assertEqual(parsed_key.public_key, self.key.public_key) + self.assertEqual(parsed_key.q, self.key.q) + self.assertEqual(parsed_key.p, self.key.p) + self.assertEqual(parsed_key.g, self.key.g) + + def test_generate(self): + key = Python_DSAKey.generate(1024, 160) + self.assertIsInstance(key, Python_DSAKey) + self.assertTrue(key.hasPrivateKey()) + + def test_sign_default(self): + msg = b"some message to sign" + + sig = self.key.hashAndSign(msg) + + + def test_verify(self): + msg = b"some message to sign" + sig = self.key.hashAndSign(msg) + self.assertTrue(sig) + + def test_sign_and_verify_with_md5(self): + msg = b"some message to sign" + + sig = self.key.hashAndSign(msg, hAlg="md5") + + self.key.hashAndVerify(sig, msg, hAlg="md5") + + def test_sign_and_verify_with_sha1(self): + msg = b"some message to sign" + + sig = self.key.hashAndSign(msg, hAlg="sha1") + + self.key.hashAndVerify(sig, msg) + + def test_sign_and_verify_with_sha224(self): + msg = b"some message to sign" + + sig = self.key.hashAndSign(msg, hAlg="sha224") + + self.key.hashAndVerify(sig, msg, hAlg="sha224") + + def test_sign_and_verify_with_sha256(self): + msg = b"some message to sign" + + sig = self.key.hashAndSign(msg, hAlg="sha256") + + self.key.hashAndVerify(sig, msg, hAlg="sha256") + + def test_sign_and_verify_with_sha384(self): + msg = b"some message to sign" + + sig = self.key.hashAndSign(msg, hAlg="sha384") + + self.key.hashAndVerify(sig, msg, hAlg="sha384") + + def test_sign_and_verify_with_sha512(self): + msg = b"some message to sign" + + sig = self.key.hashAndSign(msg, hAlg="sha512") + + self.key.hashAndVerify(sig, msg, hAlg="sha512")