Skip to content

Commit

Permalink
Merge 866e344 into 9f2a19e
Browse files Browse the repository at this point in the history
  • Loading branch information
FrantisekKrenzelok committed Nov 13, 2020
2 parents 9f2a19e + 866e344 commit 63ec8e3
Show file tree
Hide file tree
Showing 10 changed files with 573 additions and 52 deletions.
15 changes: 13 additions & 2 deletions tlslite/constants.py
Expand Up @@ -304,7 +304,7 @@ def getHash(scheme):
kType, _, hName = vals
else:
kType, _, _, hName = vals
assert kType in ('rsa', 'ecdsa')
assert kType in ('rsa', 'ecdsa', 'dsa')
return hName


Expand Down Expand Up @@ -960,6 +960,7 @@ class CipherSuite:
tripleDESSuites.append(TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA) # unsupported
tripleDESSuites.append(TLS_SRP_SHA_DSS_WITH_3DES_EDE_CBC_SHA) # unsupp


#: AES-128 CBC ciphers
aes128Suites = []
aes128Suites.append(TLS_SRP_SHA_WITH_AES_128_CBC_SHA)
Expand Down Expand Up @@ -1454,6 +1455,16 @@ def getEcdheCertSuites(cls, settings, version=None):
ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_RC4_128_SHA)
ecdheEcdsaSuites.append(TLS_ECDHE_ECDSA_WITH_NULL_SHA)

#: DHE key exchange, DSA authentication
dheDsaSuites = []
dheDsaSuites.append(TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA)
dheDsaSuites.append(TLS_DHE_DSS_WITH_AES_128_CBC_SHA)
dheDsaSuites.append(TLS_DHE_DSS_WITH_AES_256_CBC_SHA)
dheDsaSuites.append(TLS_DHE_DSS_WITH_AES_128_CBC_SHA256)
dheDsaSuites.append(TLS_DHE_DSS_WITH_AES_256_CBC_SHA256)
dheDsaSuites.append(TLS_DHE_DSS_WITH_AES_128_GCM_SHA256)
dheDsaSuites.append(TLS_DHE_DSS_WITH_AES_256_GCM_SHA384)

@classmethod
def getEcdsaSuites(cls, settings, version=None):
"""Provide ECDSA authenticated ciphersuites matching settings"""
Expand All @@ -1476,7 +1487,7 @@ def getAnonSuites(cls, settings, version=None):
"""Provide anonymous DH ciphersuites matching settings"""
return cls._filterSuites(CipherSuite.anonSuites, settings, version)

dhAllSuites = dheCertSuites + anonSuites
dhAllSuites = dheCertSuites + anonSuites + dheDsaSuites

#: anon ECDHE key exchange
ecdhAnonSuites = []
Expand Down
21 changes: 20 additions & 1 deletion tlslite/handshakesettings.py
Expand Up @@ -28,6 +28,7 @@
CIPHER_IMPLEMENTATIONS = ["openssl", "pycrypto", "python"]
CERTIFICATE_TYPES = ["x509"]
RSA_SIGNATURE_HASHES = ["sha512", "sha384", "sha256", "sha224", "sha1"]
DSA_SIGNATURE_HASHES = ["sha512", "sha384", "sha256", "sha224", "sha1"]
ECDSA_SIGNATURE_HASHES = ["sha512", "sha384", "sha256", "sha224", "sha1"]
ALL_RSA_SIGNATURE_HASHES = RSA_SIGNATURE_HASHES + ["md5"]
RSA_SCHEMES = ["pss", "pkcs1"]
Expand Down Expand Up @@ -234,6 +235,16 @@ class HandshakeSettings(object):
The allowed hashes are: "md5", "sha1", "sha224", "sha256",
"sha384" and "sha512". The default list does not include md5.
:vartype dsaSigHashes: list(str)
:ivar dsaSigHashes: List of hashes supported (and advertised as such) for
TLS 1.2 signatures over Server Key Exchange or Certificate Verify with
DSA signature algorithm.
The list is sorted from most wanted to least wanted algorithm.
The allowed hashes are: "sha1", "sha224", "sha256",
"sha384" and "sha512".
:vartype ecdsaSigHashes: list(str)
:ivar ecdsaSigHashes: List of hashes supported (and advertised as such) for
TLS 1.2 signatures over Server Key Exchange or Certificate Verify with
Expand Down Expand Up @@ -337,6 +348,7 @@ def _init_key_settings(self):
self.maxKeySize = 8193
self.rsaSigHashes = list(RSA_SIGNATURE_HASHES)
self.rsaSchemes = list(RSA_SCHEMES)
self.dsaSigHashes = list(DSA_SIGNATURE_HASHES)
self.virtual_hosts = []
# DH key settings
self.eccCurves = list(CURVE_NAMES)
Expand Down Expand Up @@ -510,8 +522,14 @@ def _sanityCheckPrimitivesNames(other):
raise ValueError("Unknown RSA padding mode: '{0}'"
.format(unknownRSAPad))

unknownSigHash = not_matching(other.dsaSigHashes,
ALL_RSA_SIGNATURE_HASHES)
if unknownSigHash:
raise ValueError("Unknown DSA signature hash: '{0}'"
.format(unknownSigHash))

if not other.rsaSigHashes and not other.ecdsaSigHashes and \
other.maxVersion >= (3, 3):
not other.dsaSigHashes and other.maxVersion >= (3, 3):
raise ValueError("TLS 1.2 requires signature algorithms to be set")

@staticmethod
Expand Down Expand Up @@ -668,6 +686,7 @@ def _copy_key_settings(self, other):
other.certificateTypes = self.certificateTypes
other.rsaSigHashes = self.rsaSigHashes
other.rsaSchemes = self.rsaSchemes
other.dsaSigHashes = self.dsaSigHashes
other.ecdsaSigHashes = self.ecdsaSigHashes
other.virtual_hosts = self.virtual_hosts
# DH key params
Expand Down
21 changes: 21 additions & 0 deletions tlslite/keyexchange.py
Expand Up @@ -183,6 +183,19 @@ def _tls12_verify_ecdsa_SKE(serverKeyExchange, publicKey, clientRandom,
raise TLSDecryptionFailed("Server Key Exchange signature "
"invalid")

@staticmethod
def _tls12_verify_dsa_SKE(serverKeyExchange, publicKey, clientRandom,
serverRandom, validSigAlgs):
hashName = HashAlgorithm.toRepr(serverKeyExchange.hashAlg)
if not hashName:
raise TLSIllegalParameterException("Unknown hash algorithm")

hashBytes = serverKeyExchange.hash(clientRandom, serverRandom)

if not publicKey.verify(serverKeyExchange.signature, hashBytes):
raise TLSDecryptionFailed("Server Key Exchange signature "
"invalid")

@staticmethod
def _tls12_verify_SKE(serverKeyExchange, publicKey, clientRandom,
serverRandom, validSigAlgs):
Expand All @@ -198,6 +211,14 @@ def _tls12_verify_SKE(serverKeyExchange, publicKey, clientRandom,
clientRandom,
serverRandom,
validSigAlgs)

elif serverKeyExchange.signAlg == SignatureAlgorithm.dsa:
return KeyExchange._tls12_verify_dsa_SKE(serverKeyExchange,
publicKey,
clientRandom,
serverRandom,
validSigAlgs)

schemeID = (serverKeyExchange.hashAlg,
serverKeyExchange.signAlg)
scheme = SignatureScheme.toRepr(schemeID)
Expand Down
11 changes: 7 additions & 4 deletions tlslite/messages.py
Expand Up @@ -1517,7 +1517,8 @@ def parse(self, parser):
raise AssertionError()

if self.cipherSuite in CipherSuite.certAllSuites or\
self.cipherSuite in CipherSuite.ecdheEcdsaSuites:
self.cipherSuite in CipherSuite.ecdheEcdsaSuites or\
self.cipherSuite in CipherSuite.dheDsaSuites:
if self.version == (3, 3):
self.hashAlg = parser.get(1)
self.signAlg = parser.get(1)
Expand Down Expand Up @@ -1566,7 +1567,8 @@ def write(self):
writer = Writer()
writer.bytes += self.writeParams()
if self.cipherSuite in CipherSuite.certAllSuites or \
self.cipherSuite in CipherSuite.ecdheEcdsaSuites:
self.cipherSuite in CipherSuite.ecdheEcdsaSuites or \
self.cipherSuite in CipherSuite.dheDsaSuites:
if self.version >= (3, 3):
assert self.hashAlg != 0 and self.signAlg != 0
writer.add(self.hashAlg, 1)
Expand All @@ -1591,9 +1593,10 @@ def hash(self, clientRandom, serverRandom):
else:
hashAlg = SignatureScheme.getHash(sigScheme)
return secureHash(bytesToHash, hashAlg)
# ECDSA ciphers in TLS 1.1 and earlier sign the messages using
# DSA and ECDSA ciphers in TLS 1.1 and earlier sign the messages using
# SHA-1 only
if self.cipherSuite in CipherSuite.ecdheEcdsaSuites:
if self.cipherSuite in CipherSuite.ecdheEcdsaSuites or\
self.cipherSuite in CipherSuite.dheDsaSuites:
return SHA1(bytesToHash)
return MD5(bytesToHash) + SHA1(bytesToHash)

Expand Down
12 changes: 12 additions & 0 deletions tlslite/tlsconnection.py
Expand Up @@ -4425,6 +4425,18 @@ def _sigHashesToList(settings, privateKey=None, certList=None,
sigAlgs.append((getattr(HashAlgorithm, hashName),
SignatureAlgorithm.ecdsa))


if not certType or certType is "dsa":
for schemeName in settings.dsaSigHashes:
if version > (3, 3):
continue;

if version < (3, 3) and hashName != "sha1":
continue;

sigAlgs.append((getattr(HashAlgorithm, hashName),
SignatureAlgorithm.dsa))

if not certType or certType in ("rsa", "rsa-pss"):
for schemeName in settings.rsaSchemes:
# pkcs#1 v1.5 signatures are not allowed in TLS 1.3
Expand Down
41 changes: 23 additions & 18 deletions tlslite/utils/python_dsakey.py
@@ -1,12 +1,11 @@
# Author: Frantisek Krenzelok

"""Pure-Python RSA implementation."""
from ecdsa.der import encode_sequence, encode_integer, \
remove_sequence, remove_integer

from .cryptomath import getRandomNumber, getRandomPrime, \
powMod, numBits, bytesToNumber, invMod, secureHash, \
GMPY2_LOADED, gmpyLoaded
powMod, numBits, bytesToNumber, invMod, \
secureHash, GMPY2_LOADED, gmpyLoaded

if GMPY2_LOADED:
from gmpy2 import mpz
Expand Down Expand Up @@ -78,29 +77,28 @@ def generate_qp(L, N):
return (q, p)

def hashAndSign(self, data, hAlg="sha1"):
digest = bytesToNumber(secureHash(bytearray(data), hAlg))
digest_size = numBits(digest)

# extract min(|hAlg|, N) left bits of digest
hashData = (secureHash(bytearray(data), hAlg))
N = numBits(self.q)
if N < digest_size:
digest &= ~(~0 << (digest_size - N))
digest_len = len(hashData) * 8
digest = bytesToNumber(hashData)
if N < digest_len:
digest = (digest & (~0 << (digest_len - N))) >> (digest_len - N)

k = getRandomNumber(1, (self.q-1))
r = powMod(self.g, k, self.p) % self.q
s = invMod(k, self.q) * (digest + self.private_key * r) % self.q

return encode_sequence(encode_integer(r), encode_integer(s))

def hashAndVerify(self, signature, data, hAlg="sha1"):
# Get r, s components from signature
digest = bytesToNumber(secureHash(bytearray(data), hAlg))
digest_size = numBits(digest)

# extract min(|hAlg|, N) left bits of digest
def verify(self, signature, hashData):
N = numBits(self.q)
if N < digest_size:
digest &= ~(~0 << (digest_size - N))
digest_len = len(hashData) * 8
digest = bytesToNumber(hashData)

if N < digest_len:
digest = (digest & (~0 << (digest_len - N))) >> (digest_len - N)

signature = bytes(signature) #fix for python 2.6

# get r, s keys
if not signature:
Expand All @@ -113,13 +111,20 @@ def hashAndVerify(self, signature, data, hAlg="sha1"):
if rest:
return False

if gmpyLoaded or GMPY2_LOADED:
r = mpz(r)
s = mpz(s)

# check the signature
if 0 < r < self.q and 0 < s < self.q:
w = invMod(s, self.q)
u1 = (digest * w) % self.q
u2 = (r * w) % self.q
v = ((powMod(self.g, u1, self.p) * \
powMod(self.public_key, u2, self.p)) % self.p) % self.q

return r == v
return False

def hashAndVerify(self, signature, data, hAlg="sha1"):
digest = secureHash(bytearray(data), hAlg)
return self.verify(signature, digest)
1 change: 1 addition & 0 deletions unit_tests/test_tlslite_handshakesettings.py
Expand Up @@ -239,6 +239,7 @@ def test_invalid_signature_algorithm(self):
def test_no_signature_hashes_set_with_TLS1_2(self):
hs = HandshakeSettings()
hs.rsaSigHashes = []
hs.dsaSigHashes = []
hs.ecdsaSigHashes = []
with self.assertRaises(ValueError):
hs.validate()
Expand Down

0 comments on commit 63ec8e3

Please sign in to comment.