Skip to content

Commit

Permalink
Merge pull request #639 from inikolcev/fix-malformed-cert-script
Browse files Browse the repository at this point in the history
Add aditional conversations and also expect BadCertificate in some cases
  • Loading branch information
tomato42 committed Feb 18, 2020
2 parents 64a7668 + 2b45619 commit 0fe3608
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 28 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ You'll need:

* Python 2.6 or later or Python 3.3 or later
* [tlslite-ng](https://github.com/tomato42/tlslite-ng)
0.8.0-alpha36 or later (note that `tlslite` will *not* work and
0.8.0-alpha37 or later (note that `tlslite` will *not* work and
they conflict with each other)
* [ecdsa](https://github.com/warner/python-ecdsa)
python module (dependency of tlslite-ng, should get installed
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
tlslite-ng>=0.8.0-alpha36
tlslite-ng>=0.8.0-alpha37
164 changes: 138 additions & 26 deletions scripts/test-certificate-malformed.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def main():
(getattr(HashAlgorithm, x),
SignatureAlgorithm.rsa) for x in ['sha512', 'sha384', 'sha256',
'sha224', 'sha1', 'md5']]),
ExtensionType.signature_algorithms_cert :
ExtensionType.signature_algorithms_cert:
SignatureAlgorithmsCertExtension().create(RSA_SIG_ALL)}
node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext))
node = node.add_child(ExpectServerHello(version=(3, 3)))
Expand Down Expand Up @@ -140,7 +140,7 @@ def main():
(getattr(HashAlgorithm, x),
SignatureAlgorithm.rsa) for x in ['sha512', 'sha384', 'sha256',
'sha224', 'sha1', 'md5']]),
ExtensionType.signature_algorithms_cert :
ExtensionType.signature_algorithms_cert:
SignatureAlgorithmsCertExtension().create(RSA_SIG_ALL)}
node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext))
node = node.add_child(ExpectServerHello(version=(3, 3)))
Expand Down Expand Up @@ -172,7 +172,7 @@ def main():
(getattr(HashAlgorithm, x),
SignatureAlgorithm.rsa) for x in ['sha512', 'sha384', 'sha256',
'sha224', 'sha1', 'md5']]),
ExtensionType.signature_algorithms_cert :
ExtensionType.signature_algorithms_cert:
SignatureAlgorithmsCertExtension().create(RSA_SIG_ALL)}
node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext))
node = node.add_child(ExpectServerHello(version=(3, 3)))
Expand Down Expand Up @@ -204,7 +204,7 @@ def main():
(getattr(HashAlgorithm, x),
SignatureAlgorithm.rsa) for x in ['sha512', 'sha384', 'sha256',
'sha224', 'sha1', 'md5']]),
ExtensionType.signature_algorithms_cert :
ExtensionType.signature_algorithms_cert:
SignatureAlgorithmsCertExtension().create(RSA_SIG_ALL)}
node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext))
node = node.add_child(ExpectServerHello(version=(3, 3)))
Expand All @@ -220,8 +220,12 @@ def main():
node = node.add_child(FinishedGenerator())
node = node.add_child(TCPBufferingDisable())
node = node.add_child(TCPBufferingFlush())
# If the lenght of the certificates is inconsitent, we expect decode_error.
# If the lenght seems fine for that cert(they are parsed one by one), but
# the payload is wrong, it will fail with bad_certificate.
node = node.add_child(ExpectAlert(AlertLevel.fatal,
AlertDescription.decode_error))
(AlertDescription.decode_error,
AlertDescription.bad_certificate)))
node = node.add_child(ExpectClose())

conversations["fuzz first certificate length with {0}".format(i)] = conversation
Expand All @@ -237,7 +241,7 @@ def main():
(getattr(HashAlgorithm, x),
SignatureAlgorithm.rsa) for x in ['sha512', 'sha384', 'sha256',
'sha224', 'sha1', 'md5']]),
ExtensionType.signature_algorithms_cert :
ExtensionType.signature_algorithms_cert:
SignatureAlgorithmsCertExtension().create(RSA_SIG_ALL)}
node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext))
node = node.add_child(ExpectServerHello(version=(3, 3)))
Expand Down Expand Up @@ -269,17 +273,14 @@ def main():
(getattr(HashAlgorithm, x),
SignatureAlgorithm.rsa) for x in ['sha512', 'sha384', 'sha256',
'sha224', 'sha1', 'md5']]),
ExtensionType.signature_algorithms_cert :
ExtensionType.signature_algorithms_cert:
SignatureAlgorithmsCertExtension().create(RSA_SIG_ALL)}
node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext))
node = node.add_child(ExpectServerHello(version=(3, 3)))
node = node.add_child(ExpectCertificate())
node = node.add_child(ExpectCertificateRequest())
node = node.add_child(ExpectServerHelloDone())
node = node.add_child(TCPBufferingEnable())
# technically, it's an invalid encoding as the items are defined as
# <1..2^24-1>, but some implementations, like OpenSSL, accept it;
# don't be too precise for now
msg = CertificateGenerator()
msg = pad_handshake(msg, 3)
msg = fuzz_message(msg, substitutions={-4:3})
Expand All @@ -289,19 +290,10 @@ def main():
node = node.add_child(FinishedGenerator())
node = node.add_child(TCPBufferingDisable())
node = node.add_child(TCPBufferingFlush())
node = node.add_child(ExpectChangeCipherSpec())
# if the implementation is proper, it will reject the message
node.next_sibling = ExpectAlert(AlertLevel.fatal,
AlertDescription.decode_error)
node.next_sibling.add_child(ExpectClose())
node = node.add_child(ExpectFinished())
node = node.add_child(ApplicationDataGenerator(b"GET / HTTP/1.0\n\n"))
node = node.add_child(ExpectApplicationData())
node = node.add_child(AlertGenerator(AlertDescription.close_notify))
node = node.add_child(ExpectAlert(AlertLevel.fatal,
AlertDescription.decode_error))
node = node.add_child(ExpectClose())
node.next_sibling = ExpectAlert()
node.next_sibling.add_child(ExpectClose())

conversations["sanity - empty client cert"] = conversation

# fuzz empty certificate message
Expand All @@ -318,7 +310,7 @@ def main():
(getattr(HashAlgorithm, x),
SignatureAlgorithm.rsa) for x in ['sha512', 'sha384', 'sha256',
'sha224', 'sha1', 'md5']]),
ExtensionType.signature_algorithms_cert :
ExtensionType.signature_algorithms_cert:
SignatureAlgorithmsCertExtension().create(RSA_SIG_ALL)}
node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext))
node = node.add_child(ExpectServerHello(version=(3, 3)))
Expand All @@ -327,9 +319,14 @@ def main():
node = node.add_child(ExpectServerHelloDone())
node = node.add_child(TCPBufferingEnable())
msg = CertificateGenerator()
# extend the handshake message to allow for modifying fictional
# certificate_list
msg = pad_handshake(msg, k)
# change the overall length of the certificate_list
subs = {6: i}
if k >= 3:
# if there's payload past certificate_list length tag
# set the byte that specifies length of first certificate
subs[9] = j
msg = fuzz_message(msg, substitutions=subs)
node = node.add_child(msg)
Expand All @@ -338,13 +335,128 @@ def main():
node = node.add_child(FinishedGenerator())
node = node.add_child(TCPBufferingDisable())
node = node.add_child(TCPBufferingFlush())
node = node.add_child(ExpectAlert(AlertLevel.fatal,
AlertDescription.decode_error))
# when the certificate is just a string of null bytes, it's a bad
# certificate
# (all length tags are 3 byte long)
if i == j + 3 and k + 3 == i + 3 and j > 0:
node = node.add_child(ExpectAlert(AlertLevel.fatal,
AlertDescription.bad_certificate))
else:
# If the lenght of the certificates is inconsitent, we expect decode_error.
# If the lenght seems fine for that cert(they are parsed one by one), but
# the payload is wrong, it will fail with bad_certificate.
node = node.add_child(ExpectAlert(AlertLevel.fatal,
(AlertDescription.decode_error,
AlertDescription.bad_certificate)))
node = node.add_child(ExpectClose())

conversations["fuzz empty certificate - overall {2}, certs {0}, cert {1}"
.format(i, j, k + 3)] = conversation

conversation = Connect(host, port)
node = conversation
ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
ext = {ExtensionType.signature_algorithms:
SignatureAlgorithmsExtension().create([
(getattr(HashAlgorithm, x),
SignatureAlgorithm.rsa) for x in ['sha512', 'sha384', 'sha256',
'sha224', 'sha1', 'md5']]),
ExtensionType.signature_algorithms_cert:
SignatureAlgorithmsCertExtension().create(RSA_SIG_ALL)}
node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext))
node = node.add_child(ExpectServerHello(version=(3, 3)))
node = node.add_child(ExpectCertificate())
node = node.add_child(ExpectCertificateRequest())
node = node.add_child(ExpectServerHelloDone())
node = node.add_child(TCPBufferingEnable())
empty_cert = X509()
msg = CertificateGenerator(X509CertChain([cert, empty_cert]))
node = node.add_child(msg)
node = node.add_child(ClientKeyExchangeGenerator())
node = node.add_child(ChangeCipherSpecGenerator())
node = node.add_child(FinishedGenerator())
node = node.add_child(TCPBufferingDisable())
node = node.add_child(TCPBufferingFlush())
node = node.add_child(ExpectAlert(AlertLevel.fatal,
AlertDescription.decode_error))
node = node.add_child(ExpectClose())

conversations["Correct cert followed by an empty one"] = conversation

# fuzz the lenght of the second certificate which is empty
for i in range(1, 0x100):
conversation = Connect(host, port)
node = conversation
ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
ext = {ExtensionType.signature_algorithms:
SignatureAlgorithmsExtension().create([
(getattr(HashAlgorithm, x),
SignatureAlgorithm.rsa) for x in ['sha512', 'sha384',
'sha256', 'sha224',
'sha1', 'md5']])}
node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext))
node = node.add_child(ExpectServerHello(version=(3, 3)))
node = node.add_child(ExpectCertificate())
node = node.add_child(ExpectCertificateRequest())
node = node.add_child(ExpectServerHelloDone())
node = node.add_child(TCPBufferingEnable())
empty_cert = X509()
msg = CertificateGenerator(X509CertChain([cert, empty_cert]))
node = node.add_child(msg)
node = node.add_child(fuzz_message(msg, xors={-1: i}))
node = node.add_child(ClientKeyExchangeGenerator())
node = node.add_child(ChangeCipherSpecGenerator())
node = node.add_child(FinishedGenerator())
node = node.add_child(TCPBufferingDisable())
node = node.add_child(TCPBufferingFlush())
node = node.add_child(ExpectAlert(AlertLevel.fatal,
AlertDescription.decode_error))
node = node.add_child(ExpectClose())

conversations["fuzz second certificate(empty) length - {0}"
.format(i)] = conversation

# Correct cert followed by 2 empty certs. Fuzz the length of the middle(2nd) cert
for i in range(1, 0x100):
conversation = Connect(host, port)
node = conversation
ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
ext = {ExtensionType.signature_algorithms:
SignatureAlgorithmsExtension().create([
(getattr(HashAlgorithm, x),
SignatureAlgorithm.rsa) for x in ['sha512', 'sha384',
'sha256', 'sha224',
'sha1', 'md5']])}
node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext))
node = node.add_child(ExpectServerHello(version=(3, 3)))
node = node.add_child(ExpectCertificate())
node = node.add_child(ExpectCertificateRequest())
node = node.add_child(ExpectServerHelloDone())
node = node.add_child(TCPBufferingEnable())
empty_cert = X509()
msg = CertificateGenerator(X509CertChain([cert, empty_cert]))
msg = pad_handshake(msg, 3)
node = node.add_child(msg)
node = node.add_child(fuzz_message(msg, xors={-4: i}))
node = node.add_child(ClientKeyExchangeGenerator())
node = node.add_child(ChangeCipherSpecGenerator())
node = node.add_child(FinishedGenerator())
node = node.add_child(TCPBufferingDisable())
node = node.add_child(TCPBufferingFlush())
# If the lenght of the certificates is inconsitent, we expect decode_error.
# If the lenght seems fine for that cert(they are parsed one by one), but
# the payload is wrong, it will fail with bad_certificate.
node = node.add_child(ExpectAlert(AlertLevel.fatal,
(AlertDescription.decode_error,
AlertDescription.bad_certificate)))
node = node.add_child(ExpectClose())

conversations["fuzz middle certificate(empty) length - {0}"
.format(i)] = conversation

# run the conversation
good = 0
bad = 0
Expand All @@ -357,7 +469,6 @@ def main():
shuffled_tests = sample(regular_tests, len(regular_tests))
ordered_tests = chain(sanity_tests, shuffled_tests, sanity_tests)


for c_name, c_test in ordered_tests:
if run_only and c_name not in run_only or c_name in run_exclude:
continue
Expand All @@ -380,7 +491,7 @@ def main():
bad += 1
failed.append(c_name)

print("Malformed Certificate test version 1\n")
print("Malformed Certificate test version 2\n")
print("Test end")
print("successful: {0}".format(good))
print("failed: {0}".format(bad))
Expand All @@ -390,5 +501,6 @@ def main():
if bad > 0:
sys.exit(1)


if __name__ == "__main__":
main()

0 comments on commit 0fe3608

Please sign in to comment.