Skip to content

Commit

Permalink
Merge pull request #740 from tlsfuzzer/fips-compat
Browse files Browse the repository at this point in the history
FIPS compatibility for tests
  • Loading branch information
tomato42 committed Mar 5, 2021
2 parents 0aafe72 + e51a7e1 commit a3b886c
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 70 deletions.
162 changes: 106 additions & 56 deletions scripts/test-certificate-verify-malformed-sig.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,25 @@
ClientKeyExchangeGenerator, ChangeCipherSpecGenerator, \
FinishedGenerator, ApplicationDataGenerator, \
CertificateGenerator, CertificateVerifyGenerator, \
AlertGenerator
AlertGenerator, TCPBufferingEnable, TCPBufferingDisable, \
TCPBufferingFlush
from tlsfuzzer.expect import ExpectServerHello, ExpectCertificate, \
ExpectServerHelloDone, ExpectChangeCipherSpec, ExpectFinished, \
ExpectAlert, ExpectClose, ExpectCertificateRequest, \
ExpectApplicationData
ExpectApplicationData, ExpectServerKeyExchange
from tlslite.extensions import SignatureAlgorithmsExtension, \
SignatureAlgorithmsCertExtension
SignatureAlgorithmsCertExtension, SupportedGroupsExtension
from tlslite.constants import CipherSuite, AlertDescription, \
HashAlgorithm, SignatureAlgorithm, ExtensionType
HashAlgorithm, SignatureAlgorithm, ExtensionType, GroupName
from tlslite.utils.keyfactory import parsePEMKey
from tlslite.x509 import X509
from tlslite.x509certchain import X509CertChain
from tlsfuzzer.utils.lists import natural_sort_keys
from tlsfuzzer.helpers import RSA_SIG_ALL

version = 2

version = 5


def help_msg():
print("Usage: <script-name> [-h hostname] [-p port] [[probe-name] ...]")
Expand All @@ -48,6 +51,7 @@ def help_msg():
print(" usage: [-x probe-name] [-X exception], order is compulsory!")
print(" -n num run 'num' or all(if 0) tests instead of default(all)")
print(" (excluding \"sanity\" tests)")
print(" -d negotiate (EC)DHE instead of RSA key exchange")
print(" -k file.pem file with private key for client")
print(" -c file.pem file with certificate for client")
print(" --help this message")
Expand All @@ -63,9 +67,10 @@ def main():
last_exp_tmp = None
private_key = None
cert = None
dhe = False

argv = sys.argv[1:]
opts, args = getopt.getopt(argv, "h:p:e:x:X:n:k:c:", ["help"])
opts, args = getopt.getopt(argv, "h:p:e:x:X:n:k:c:d", ["help"])
for opt, arg in opts:
if opt == '-h':
host = arg
Expand Down Expand Up @@ -96,6 +101,8 @@ def main():
text_cert = str(text_cert, 'utf-8')
cert = X509()
cert.parse(text_cert)
elif opt == "-d":
dhe = True
else:
raise ValueError("Unknown option: {0}".format(opt))

Expand All @@ -112,24 +119,35 @@ def main():
conversations = {}

# sanity check for Client Certificates
sanity_hash_alg = ("sha1", "sha256")
sanity_hash_alg = ("sha1", "sha256", "sha384")
for hash_alg in sanity_hash_alg:
conversation = Connect(host, port)
node = conversation
ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
if dhe:
ciphers = [CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
else:
ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
ext = {ExtensionType.signature_algorithms :
SignatureAlgorithmsExtension().create([
(getattr(HashAlgorithm, x),
SignatureAlgorithm.rsa) for x in ['sha512', 'sha384', 'sha256',
'sha224', 'sha1', 'md5']]),
ExtensionType.signature_algorithms_cert :
SignatureAlgorithmsCertExtension().create(RSA_SIG_ALL)}
if dhe:
ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\
.create([GroupName.secp256r1, GroupName.ffdhe2048])
node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext))
node = node.add_child(ExpectServerHello(version=(3, 3)))
node = node.add_child(ExpectCertificate())
if dhe:
node = node.add_child(ExpectServerKeyExchange())
node = node.add_child(ExpectCertificateRequest())
node = node.add_child(ExpectServerHelloDone())
node = node.add_child(TCPBufferingEnable())
node = node.add_child(CertificateGenerator(X509CertChain([cert])))
node = node.add_child(ClientKeyExchangeGenerator())
sig_type = (getattr(HashAlgorithm, hash_alg), SignatureAlgorithm.rsa)
Expand All @@ -138,6 +156,8 @@ def main():
))
node = node.add_child(ChangeCipherSpecGenerator())
node = node.add_child(FinishedGenerator())
node = node.add_child(TCPBufferingDisable())
node = node.add_child(TCPBufferingFlush())
node = node.add_child(ExpectChangeCipherSpec())
node = node.add_child(ExpectFinished())
node = node.add_child(ApplicationDataGenerator(b"GET / HTTP/1.0\n\n"))
Expand All @@ -149,81 +169,105 @@ def main():

conversations["sanity - {0}".format(hash_alg)] = conversation

# place SHA-1 sig with SHA-256 indicator
conversation = Connect(host, port)
node = conversation
ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
ext = {ExtensionType.signature_algorithms :
SignatureAlgorithmsExtension().create([
(getattr(HashAlgorithm, x),
SignatureAlgorithm.rsa) for x in ['sha512', 'sha384', 'sha256',
'sha224', 'sha1', 'md5']]),
ExtensionType.signature_algorithms_cert :
SignatureAlgorithmsCertExtension().create(RSA_SIG_ALL)}
node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext))
node = node.add_child(ExpectServerHello(version=(3, 3)))
node = node.add_child(ExpectCertificate())
node = node.add_child(ExpectCertificateRequest())
node = node.add_child(ExpectServerHelloDone())
node = node.add_child(CertificateGenerator(X509CertChain([cert])))
node = node.add_child(ClientKeyExchangeGenerator())
sig_type = (HashAlgorithm.sha1, SignatureAlgorithm.rsa)
msg_type = (HashAlgorithm.sha256, SignatureAlgorithm.rsa)
node = node.add_child(CertificateVerifyGenerator(private_key,
msg_alg=msg_type,
sig_alg=sig_type
))
# the other side can close connection right away, add options to handle it
node.next_sibling = ExpectClose()
node = node.add_child(ChangeCipherSpecGenerator())
node.next_sibling = ExpectClose()
node = node.add_child(FinishedGenerator())
node.next_sibling = ExpectClose()
# we expect closure or Alert and then closure of socket
node = node.add_child(ExpectClose())
node.next_sibling = ExpectAlert()
node.next_sibling.add_child(ExpectClose())
# try valid signatures but with wrong identifiers on TLS level
for name, signature, envelope in [
("SHA-1 signature in SHA-256 envelope", HashAlgorithm.sha1,
HashAlgorithm.sha256),
("SHA-256 signature in SHA-1 envelope", HashAlgorithm.sha256,
HashAlgorithm.sha1),
("SHA-384 signature in SHA-256 envelope", HashAlgorithm.sha384,
HashAlgorithm.sha256)]:
conversation = Connect(host, port)
node = conversation
if dhe:
ciphers = [CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
else:
ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
ext = {ExtensionType.signature_algorithms :
SignatureAlgorithmsExtension().create([
(getattr(HashAlgorithm, x),
SignatureAlgorithm.rsa) for x in ['sha512', 'sha384', 'sha256',
'sha224', 'sha1', 'md5']]),
ExtensionType.signature_algorithms_cert :
SignatureAlgorithmsCertExtension().create(RSA_SIG_ALL)}
if dhe:
ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\
.create([GroupName.secp256r1, GroupName.ffdhe2048])
node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext))
node = node.add_child(ExpectServerHello(version=(3, 3)))
node = node.add_child(ExpectCertificate())
if dhe:
node = node.add_child(ExpectServerKeyExchange())
node = node.add_child(ExpectCertificateRequest())
node = node.add_child(ExpectServerHelloDone())
node = node.add_child(TCPBufferingEnable())
node = node.add_child(CertificateGenerator(X509CertChain([cert])))
node = node.add_child(ClientKeyExchangeGenerator())
sig_type = (signature, SignatureAlgorithm.rsa)
msg_type = (envelope, SignatureAlgorithm.rsa)
node = node.add_child(CertificateVerifyGenerator(private_key,
msg_alg=msg_type,
sig_alg=sig_type
))
node = node.add_child(ChangeCipherSpecGenerator())
node = node.add_child(FinishedGenerator())
node = node.add_child(TCPBufferingDisable())
node = node.add_child(TCPBufferingFlush())
# we expect closure or Alert and then closure of socket
node = node.add_child(ExpectAlert(
description=AlertDescription.decrypt_error))
node.add_child(ExpectClose())

conversations["SHA-1 signature in SHA-256 envelope"] = conversation
conversations[name] = conversation

# because the TLSv1.1 signatures are concatenation of MD5 and SHA1
# implementation that just checks the hash, without verifying the Hash
# Info structure in signature, will accept a TLSv1.1 signature
# in a TLSv1.2 SHA-1 envelope
conversation = Connect(host, port)
node = conversation
ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
if dhe:
ciphers = [CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
else:
ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
ext = {ExtensionType.signature_algorithms :
SignatureAlgorithmsExtension().create([
(getattr(HashAlgorithm, x),
SignatureAlgorithm.rsa) for x in ['sha512', 'sha384', 'sha256',
'sha224', 'sha1', 'md5']]),
ExtensionType.signature_algorithms_cert :
SignatureAlgorithmsCertExtension().create(RSA_SIG_ALL)}
if dhe:
ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\
.create([GroupName.secp256r1, GroupName.ffdhe2048])
node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext))
node = node.add_child(ExpectServerHello(version=(3, 3)))
node = node.add_child(ExpectCertificate())
if dhe:
node = node.add_child(ExpectServerKeyExchange())
node = node.add_child(ExpectCertificateRequest())
node = node.add_child(ExpectServerHelloDone())
node = node.add_child(TCPBufferingEnable())
node = node.add_child(CertificateGenerator(X509CertChain([cert])))
node = node.add_child(ClientKeyExchangeGenerator())
msg_alg = (HashAlgorithm.sha1, SignatureAlgorithm.rsa)
node = node.add_child(CertificateVerifyGenerator(private_key,
msg_alg=msg_alg,
sig_version=(3, 2)
))
# the other side can close connection right away, add options to handle it
node.next_sibling = ExpectClose()
node = node.add_child(ChangeCipherSpecGenerator())
node.next_sibling = ExpectClose()
node = node.add_child(FinishedGenerator())
node.next_sibling = ExpectClose()
# we expect closure or Alert and then closure of socket
node = node.add_child(ExpectClose())
node.next_sibling = ExpectAlert()
node.next_sibling.add_child(ExpectClose())
node = node.add_child(TCPBufferingDisable())
node = node.add_child(TCPBufferingFlush())
node = node.add_child(ExpectAlert(
description=AlertDescription.decrypt_error))
node.add_child(ExpectClose())

conversations["TLSv1.1 signature in SHA-1 TLSv1.2 envelope"] = conversation

Expand All @@ -239,7 +283,8 @@ def main():

# make sure that sanity test is run first and last
# to verify that server was running and kept running throughout
sanity_tests = []
sanity_tests = [(k, v) for k, v in conversations.items() if
k.startswith('sanity')]
if run_only:
if num_limit > len(run_only):
num_limit = len(run_only)
Expand Down Expand Up @@ -297,6 +342,11 @@ def main():
failed.append(c_name)

print("CertificateVerify malformed signatures test\n")
print("Note, that if one of the sanity tests fail, like the SHA-1")
print("then the server most likely doesn't allow SHA-1 signatures in")
print("CertificateVerify. If CertificateRequest doesn't list those")
print("algorithms then CertificateVerify with SHA-1 should be rejected")
print("using the illegal_parameter alert\n")

print("Test end")
print(20 * '=')
Expand Down

0 comments on commit a3b886c

Please sign in to comment.