Skip to content

Commit

Permalink
verifying DSA signatures in SKE
Browse files Browse the repository at this point in the history
  • Loading branch information
FrantisekKrenzelok authored and fkrenzel committed Dec 2, 2020
1 parent 0812ed6 commit d57e316
Show file tree
Hide file tree
Showing 11 changed files with 698 additions and 73 deletions.
40 changes: 37 additions & 3 deletions tlslite/constants.py
Expand Up @@ -304,7 +304,7 @@ def getHash(scheme):
kType, _, hName = vals
else:
kType, _, _, hName = vals
assert kType in ('rsa', 'ecdsa')
assert kType in ('rsa', 'ecdsa', 'dsa')
return hName


Expand Down Expand Up @@ -960,6 +960,7 @@ class CipherSuite:
tripleDESSuites.append(TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA) # unsupported
tripleDESSuites.append(TLS_SRP_SHA_DSS_WITH_3DES_EDE_CBC_SHA) # unsupp


#: AES-128 CBC ciphers
aes128Suites = []
aes128Suites.append(TLS_SRP_SHA_WITH_AES_128_CBC_SHA)
Expand Down Expand Up @@ -1258,6 +1259,8 @@ def filter_for_certificate(suites, cert_chain):
CipherSuite.certSuites)
if cert_chain.x509List[0].certAlg == "ecdsa":
includeSuites.update(CipherSuite.ecdheEcdsaSuites)
if cert_chain.x509List[0].certAlg == "dsa":
includeSuites.update(CipherSuite.dheDsaSuites)
else:
includeSuites.update(CipherSuite.srpSuites)
includeSuites.update(CipherSuite.anonSuites)
Expand Down Expand Up @@ -1318,6 +1321,8 @@ def _filterSuites(suites, settings, version=None):
keyExchangeSuites += CipherSuite.certSuites
if "dhe_rsa" in keyExchangeNames:
keyExchangeSuites += CipherSuite.dheCertSuites
if "dhe_dsa" in keyExchangeNames:
keyExchangeSuites += CipherSuite.dheDsaSuites
if "ecdhe_rsa" in keyExchangeNames:
keyExchangeSuites += CipherSuite.ecdheCertSuites
if "ecdhe_ecdsa" in keyExchangeNames:
Expand All @@ -1326,6 +1331,8 @@ def _filterSuites(suites, settings, version=None):
keyExchangeSuites += CipherSuite.srpSuites
if "srp_sha_rsa" in keyExchangeNames:
keyExchangeSuites += CipherSuite.srpCertSuites
if "srp_sha_dsa" in keyExchangeNames:
keyExchangeSuites += CipherSuite.srpDsaSuites
if "dh_anon" in keyExchangeNames:
keyExchangeSuites += CipherSuite.anonSuites
if "ecdh_anon" in keyExchangeNames:
Expand Down Expand Up @@ -1361,8 +1368,19 @@ def getSrpCertSuites(cls, settings, version=None):
"""Return SRP cipher suites that use server certificates"""
return cls._filterSuites(CipherSuite.srpCertSuites, settings, version)

#: SRP key exchange, DSA authentication
srpDsaSuites = []
srpDsaSuites.append(TLS_SRP_SHA_DSS_WITH_3DES_EDE_CBC_SHA) # unsupported
srpDsaSuites.append(TLS_SRP_SHA_DSS_WITH_AES_128_CBC_SHA) # unsupported
srpDsaSuites.append(TLS_SRP_SHA_DSS_WITH_AES_256_CBC_SHA) # unsupported

@classmethod
def getSrpDsaSuites(cls, settings, version=None):
"""Return SRP DSA cipher suites that use server certificates"""
return cls._filterSuites(CipherSuite.srpCertSuites, settings, version)

#: All that use SRP key exchange
srpAllSuites = srpSuites + srpCertSuites
srpAllSuites = srpSuites + srpCertSuites + srpDsaSuites

@classmethod
def getSrpAllSuites(cls, settings, version=None):
Expand Down Expand Up @@ -1460,6 +1478,22 @@ def getEcdsaSuites(cls, settings, version=None):
return cls._filterSuites(CipherSuite.ecdheEcdsaSuites,
settings, version)

#: DHE key exchange, DSA authentication
dheDsaSuites = []
dheDsaSuites.append(TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA)
dheDsaSuites.append(TLS_DHE_DSS_WITH_AES_128_CBC_SHA)
dheDsaSuites.append(TLS_DHE_DSS_WITH_AES_256_CBC_SHA)
dheDsaSuites.append(TLS_DHE_DSS_WITH_AES_128_CBC_SHA256)
dheDsaSuites.append(TLS_DHE_DSS_WITH_AES_256_CBC_SHA256)
dheDsaSuites.append(TLS_DHE_DSS_WITH_AES_128_GCM_SHA256)
dheDsaSuites.append(TLS_DHE_DSS_WITH_AES_256_GCM_SHA384)

@classmethod
def getDheDsaSuites(cls, settings, version=None):
"""Provide DSA authenticated ciphersuites matching settings"""
return cls._filterSuites(CipherSuite.dheDsaSuites,
settings, version)

#: anon FFDHE key exchange
anonSuites = []
anonSuites.append(TLS_DH_ANON_WITH_AES_256_GCM_SHA384)
Expand All @@ -1476,7 +1510,7 @@ def getAnonSuites(cls, settings, version=None):
"""Provide anonymous DH ciphersuites matching settings"""
return cls._filterSuites(CipherSuite.anonSuites, settings, version)

dhAllSuites = dheCertSuites + anonSuites
dhAllSuites = dheCertSuites + anonSuites + dheDsaSuites

#: anon ECDHE key exchange
ecdhAnonSuites = []
Expand Down
21 changes: 20 additions & 1 deletion tlslite/handshakesettings.py
Expand Up @@ -28,6 +28,7 @@
CIPHER_IMPLEMENTATIONS = ["openssl", "pycrypto", "python"]
CERTIFICATE_TYPES = ["x509"]
RSA_SIGNATURE_HASHES = ["sha512", "sha384", "sha256", "sha224", "sha1"]
DSA_SIGNATURE_HASHES = ["sha512", "sha384", "sha256", "sha224", "sha1"]
ECDSA_SIGNATURE_HASHES = ["sha512", "sha384", "sha256", "sha224", "sha1"]
ALL_RSA_SIGNATURE_HASHES = RSA_SIGNATURE_HASHES + ["md5"]
RSA_SCHEMES = ["pss", "pkcs1"]
Expand Down Expand Up @@ -234,6 +235,16 @@ class HandshakeSettings(object):
The allowed hashes are: "md5", "sha1", "sha224", "sha256",
"sha384" and "sha512". The default list does not include md5.
:vartype dsaSigHashes: list(str)
:ivar dsaSigHashes: List of hashes supported (and advertised as such) for
TLS 1.2 signatures over Server Key Exchange or Certificate Verify with
DSA signature algorithm.
The list is sorted from most wanted to least wanted algorithm.
The allowed hashes are: "sha1", "sha224", "sha256",
"sha384" and "sha512".
:vartype ecdsaSigHashes: list(str)
:ivar ecdsaSigHashes: List of hashes supported (and advertised as such) for
TLS 1.2 signatures over Server Key Exchange or Certificate Verify with
Expand Down Expand Up @@ -337,6 +348,7 @@ def _init_key_settings(self):
self.maxKeySize = 8193
self.rsaSigHashes = list(RSA_SIGNATURE_HASHES)
self.rsaSchemes = list(RSA_SCHEMES)
self.dsaSigHashes = list(DSA_SIGNATURE_HASHES)
self.virtual_hosts = []
# DH key settings
self.eccCurves = list(CURVE_NAMES)
Expand Down Expand Up @@ -510,8 +522,14 @@ def _sanityCheckPrimitivesNames(other):
raise ValueError("Unknown RSA padding mode: '{0}'"
.format(unknownRSAPad))

unknownSigHash = not_matching(other.dsaSigHashes,
ALL_RSA_SIGNATURE_HASHES)
if unknownSigHash:
raise ValueError("Unknown DSA signature hash: '{0}'"
.format(unknownSigHash))

if not other.rsaSigHashes and not other.ecdsaSigHashes and \
other.maxVersion >= (3, 3):
not other.dsaSigHashes and other.maxVersion >= (3, 3):
raise ValueError("TLS 1.2 requires signature algorithms to be set")

@staticmethod
Expand Down Expand Up @@ -668,6 +686,7 @@ def _copy_key_settings(self, other):
other.certificateTypes = self.certificateTypes
other.rsaSigHashes = self.rsaSigHashes
other.rsaSchemes = self.rsaSchemes
other.dsaSigHashes = self.dsaSigHashes
other.ecdsaSigHashes = self.ecdsaSigHashes
other.virtual_hosts = self.virtual_hosts
# DH key params
Expand Down
53 changes: 53 additions & 0 deletions tlslite/keyexchange.py
Expand Up @@ -100,6 +100,34 @@ def _tls12_sign_ecdsa_SKE(self, serverKeyExchange, sigHash=None):
ecdsa.util.sigdecode_der):
raise TLSInternalError("signature validation failure")

def _tls12_sign_dsa_SKE(self, serverKeyExchange, sigHash=None):
"""Sign a TLSv1.2 SKE message."""
try:
serverKeyExchange.hashAlg, serverKeyExchange.signAlg = \
getattr(SignatureScheme, sigHash)
hashName = SignatureScheme.getHash(sigHash);

except AttributeError:
serverKeyExchange.signAlg = SignatureAlgorithm.dsa
serverKeyExchange.hashAlg = getattr(HashAlgorithm, sigHash)
keyType = 'rsa'
hashName = sigHash

hash_bytes = serverKeyExchange.hash(self.clientHello.random,
self.serverHello.random)

serverKeyExchange.signature = \
self.privateKey.sign(hashBytes,
hashAlg=hashName)

if not serverKeyExchange.signature:
raise TLSInternalError("Empty signature")

if not self.privateKey.verify(serverKeyExchange.signature,
hashBytes,
hashAlg=hashName):
raise TLSInternalError("Server Key Exchange signature invalid")

def _tls12_signSKE(self, serverKeyExchange, sigHash=None):
"""Sign a TLSv1.2 SKE message."""
try:
Expand Down Expand Up @@ -148,6 +176,8 @@ def signServerKeyExchange(self, serverKeyExchange, sigHash=None):
if self.serverHello.server_version < (3, 3):
if self.privateKey.key_type == "ecdsa":
serverKeyExchange.signAlg = SignatureAlgorithm.ecdsa
if self.privateKey.key_type == "dsa":
serverKeyExchange.signAlg = SignatureAlgorithm.dsa
hashBytes = serverKeyExchange.hash(self.clientHello.random,
self.serverHello.random)

Expand All @@ -162,6 +192,8 @@ def signServerKeyExchange(self, serverKeyExchange, sigHash=None):
else:
if self.privateKey.key_type == "ecdsa":
self._tls12_sign_ecdsa_SKE(serverKeyExchange, sigHash)
elif self.privateKey.key_type == "dsa":
self._tls12_sign_dsa_SKE(serverKeyExchange, sigHash)
else:
self._tls12_signSKE(serverKeyExchange, sigHash)

Expand All @@ -183,6 +215,19 @@ def _tls12_verify_ecdsa_SKE(serverKeyExchange, publicKey, clientRandom,
raise TLSDecryptionFailed("Server Key Exchange signature "
"invalid")

@staticmethod
def _tls12_verify_dsa_SKE(serverKeyExchange, publicKey, clientRandom,
serverRandom, validSigAlgs):
hashName = HashAlgorithm.toRepr(serverKeyExchange.hashAlg)
if not hashName:
raise TLSIllegalParameterException("Unknown hash algorithm")

hashBytes = serverKeyExchange.hash(clientRandom, serverRandom)

if not publicKey.verify(serverKeyExchange.signature, hashBytes):
raise TLSDecryptionFailed("Server Key Exchange signature "
"invalid")

@staticmethod
def _tls12_verify_SKE(serverKeyExchange, publicKey, clientRandom,
serverRandom, validSigAlgs):
Expand All @@ -198,6 +243,14 @@ def _tls12_verify_SKE(serverKeyExchange, publicKey, clientRandom,
clientRandom,
serverRandom,
validSigAlgs)

elif serverKeyExchange.signAlg == SignatureAlgorithm.dsa:
return KeyExchange._tls12_verify_dsa_SKE(serverKeyExchange,
publicKey,
clientRandom,
serverRandom,
validSigAlgs)

schemeID = (serverKeyExchange.hashAlg,
serverKeyExchange.signAlg)
scheme = SignatureScheme.toRepr(schemeID)
Expand Down
11 changes: 7 additions & 4 deletions tlslite/messages.py
Expand Up @@ -1517,7 +1517,8 @@ def parse(self, parser):
raise AssertionError()

if self.cipherSuite in CipherSuite.certAllSuites or\
self.cipherSuite in CipherSuite.ecdheEcdsaSuites:
self.cipherSuite in CipherSuite.ecdheEcdsaSuites or\
self.cipherSuite in CipherSuite.dheDsaSuites:
if self.version == (3, 3):
self.hashAlg = parser.get(1)
self.signAlg = parser.get(1)
Expand Down Expand Up @@ -1566,7 +1567,8 @@ def write(self):
writer = Writer()
writer.bytes += self.writeParams()
if self.cipherSuite in CipherSuite.certAllSuites or \
self.cipherSuite in CipherSuite.ecdheEcdsaSuites:
self.cipherSuite in CipherSuite.ecdheEcdsaSuites or \
self.cipherSuite in CipherSuite.dheDsaSuites:
if self.version >= (3, 3):
assert self.hashAlg != 0 and self.signAlg != 0
writer.add(self.hashAlg, 1)
Expand All @@ -1591,9 +1593,10 @@ def hash(self, clientRandom, serverRandom):
else:
hashAlg = SignatureScheme.getHash(sigScheme)
return secureHash(bytesToHash, hashAlg)
# ECDSA ciphers in TLS 1.1 and earlier sign the messages using
# DSA and ECDSA ciphers in TLS 1.1 and earlier sign the messages using
# SHA-1 only
if self.cipherSuite in CipherSuite.ecdheEcdsaSuites:
if self.cipherSuite in CipherSuite.ecdheEcdsaSuites or\
self.cipherSuite in CipherSuite.dheDsaSuites:
return SHA1(bytesToHash)
return MD5(bytesToHash) + SHA1(bytesToHash)

Expand Down

0 comments on commit d57e316

Please sign in to comment.