Skip to content

Commit

Permalink
Merge pull request #759 from tlsfuzzer/resume-with-wrong-ciph-dh
Browse files Browse the repository at this point in the history
Resume with wrong ciphers: enable for DH ciphers
  • Loading branch information
tomato42 committed May 25, 2021
2 parents c1b6a3d + f4eac4b commit 17a6e1b
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 40 deletions.
217 changes: 181 additions & 36 deletions scripts/test-resumption-with-wrong-ciphers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,23 @@
from random import sample

from tlslite.constants import CipherSuite, AlertLevel, AlertDescription, \
ExtensionType
ExtensionType, GroupName
from tlsfuzzer.runner import Runner
from tlsfuzzer.messages import Connect, ClientHelloGenerator, \
ClientKeyExchangeGenerator, ChangeCipherSpecGenerator, \
FinishedGenerator, ApplicationDataGenerator, AlertGenerator, \
ResetHandshakeHashes, Close, ResetRenegotiationInfo
from tlsfuzzer.expect import ExpectServerHello, ExpectCertificate, \
ExpectServerHelloDone, ExpectChangeCipherSpec, ExpectFinished, \
ExpectAlert, ExpectClose, ExpectApplicationData
ExpectAlert, ExpectClose, ExpectApplicationData, \
ExpectServerKeyExchange
from tlsfuzzer.utils.lists import natural_sort_keys
from tlsfuzzer.helpers import SIG_ALL
from tlslite.extensions import SupportedGroupsExtension, \
SignatureAlgorithmsExtension, SignatureAlgorithmsCertExtension


version = 3
version = 4


def help_msg():
Expand All @@ -41,7 +45,10 @@ def help_msg():
print(" usage: [-x probe-name] [-X exception], order is compulsory!")
print(" -n num run 'num' or all(if 0) tests instead of default(all)")
print(" (excluding \"sanity\" tests)")
print(" -d negotiate (EC)DHE instead of RSA key exchange")
print(" --swap-ciphers expect the server to pick AES-128 over AES-256")
print(" --n/n-1 expect n/n-1 record splitting (should be used")
print(" for TLS 1.0 and earlier only)")
print(" --help this message")


Expand All @@ -54,9 +61,12 @@ def main():
expected_failures = {}
last_exp_tmp = None
swap_ciphers = False
dhe = False
splitting = False

argv = sys.argv[1:]
opts, args = getopt.getopt(argv, "h:p:e:x:X:n:", ["help", "swap-ciphers"])
opts, args = getopt.getopt(argv, "h:p:e:x:X:n:d", ["help", "swap-ciphers",
"n/n-1"])
for opt, arg in opts:
if opt == '-h':
host = arg
Expand All @@ -75,6 +85,10 @@ def main():
num_limit = int(arg)
elif opt == "--swap-ciphers":
swap_ciphers = True
elif opt == '-d':
dhe = True
elif opt == "--n/n-1":
splitting = True
elif opt == '--help':
help_msg()
sys.exit(0)
Expand All @@ -90,20 +104,40 @@ def main():

conversation = Connect(host, port)
node = conversation
ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
node = node.add_child(ClientHelloGenerator(ciphers))
if dhe:
ext = {}
groups = [GroupName.secp256r1,
GroupName.ffdhe2048]
ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\
.create(groups)
ext[ExtensionType.signature_algorithms] = \
SignatureAlgorithmsExtension().create(SIG_ALL)
ext[ExtensionType.signature_algorithms_cert] = \
SignatureAlgorithmsCertExtension().create(SIG_ALL)
ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
else:
ext = None
ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext))
node = node.add_child(ExpectServerHello())
node = node.add_child(ExpectCertificate())
if dhe:
node = node.add_child(ExpectServerKeyExchange())
node = node.add_child(ExpectServerHelloDone())
node = node.add_child(ClientKeyExchangeGenerator())
node = node.add_child(ChangeCipherSpecGenerator())
node = node.add_child(FinishedGenerator())
node = node.add_child(ExpectChangeCipherSpec())
node = node.add_child(ExpectFinished())
node = node.add_child(ApplicationDataGenerator(
bytearray(b"GET / HTTP/1.0\n\n")))
bytearray(b"GET / HTTP/1.0\r\n\r\n")))
node = node.add_child(ExpectApplicationData())
if splitting:
node = node.add_child(ExpectApplicationData())
node = node.add_child(AlertGenerator(AlertLevel.warning,
AlertDescription.close_notify))
node = node.add_child(ExpectAlert())
Expand All @@ -112,20 +146,40 @@ def main():

conversation = Connect(host, port)
node = conversation
ciphers = [CipherSuite.TLS_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
node = node.add_child(ClientHelloGenerator(ciphers))
if dhe:
ext = {}
groups = [GroupName.secp256r1,
GroupName.ffdhe2048]
ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\
.create(groups)
ext[ExtensionType.signature_algorithms] = \
SignatureAlgorithmsExtension().create(SIG_ALL)
ext[ExtensionType.signature_algorithms_cert] = \
SignatureAlgorithmsCertExtension().create(SIG_ALL)
ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_DHE_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
else:
ext = None
ciphers = [CipherSuite.TLS_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext))
node = node.add_child(ExpectServerHello())
node = node.add_child(ExpectCertificate())
if dhe:
node = node.add_child(ExpectServerKeyExchange())
node = node.add_child(ExpectServerHelloDone())
node = node.add_child(ClientKeyExchangeGenerator())
node = node.add_child(ChangeCipherSpecGenerator())
node = node.add_child(FinishedGenerator())
node = node.add_child(ExpectChangeCipherSpec())
node = node.add_child(ExpectFinished())
node = node.add_child(ApplicationDataGenerator(
bytearray(b"GET / HTTP/1.0\n\n")))
bytearray(b"GET / HTTP/1.0\r\n\r\n")))
node = node.add_child(ExpectApplicationData())
if splitting:
node = node.add_child(ExpectApplicationData())
node = node.add_child(AlertGenerator(AlertLevel.warning,
AlertDescription.close_notify))
node = node.add_child(ExpectAlert())
Expand All @@ -134,16 +188,46 @@ def main():

conversation = Connect(host, port)
node = conversation
ciphers = [CipherSuite.TLS_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA]
node = node.add_child(ClientHelloGenerator(
ciphers,
extensions={ExtensionType.renegotiation_info:None}))
ext = {ExtensionType.renegotiation_info: None}
if dhe:
groups = [GroupName.secp256r1,
GroupName.ffdhe2048]
ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\
.create(groups)
ext[ExtensionType.signature_algorithms] = \
SignatureAlgorithmsExtension().create(SIG_ALL)
ext[ExtensionType.signature_algorithms_cert] = \
SignatureAlgorithmsCertExtension().create(SIG_ALL)
ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_DHE_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA]
else:
ciphers = [CipherSuite.TLS_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA]
node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext))
if dhe:
if swap_ciphers:
exp_ciph = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA]
else:
exp_ciph = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_DHE_RSA_WITH_AES_256_CBC_SHA]
else:
if swap_ciphers:
exp_ciph = CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA
else:
exp_ciph = CipherSuite.TLS_RSA_WITH_AES_256_CBC_SHA
node = node.add_child(ExpectServerHello(
cipher=CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA if swap_ciphers else
CipherSuite.TLS_RSA_WITH_AES_256_CBC_SHA,
cipher=exp_ciph,
extensions={ExtensionType.renegotiation_info:None}))
node = node.add_child(ExpectCertificate())
if dhe:
node = node.add_child(ExpectServerKeyExchange())
node = node.add_child(ExpectServerHelloDone())
node = node.add_child(ClientKeyExchangeGenerator())
node = node.add_child(ChangeCipherSpecGenerator())
Expand Down Expand Up @@ -175,6 +259,8 @@ def main():
node = node.add_child(ApplicationDataGenerator(
bytearray(b"GET / HTTP/1.0\n\n")))
node = node.add_child(ExpectApplicationData())
if splitting:
node = node.add_child(ExpectApplicationData())
node = node.add_child(AlertGenerator(AlertLevel.warning,
AlertDescription.close_notify))
node = node.add_child(ExpectAlert())
Expand All @@ -186,16 +272,46 @@ def main():
# check if resumption with a different cipher fails
conversation = Connect(host, port)
node = conversation
ciphers = [CipherSuite.TLS_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA]
node = node.add_child(ClientHelloGenerator(
ciphers,
extensions={ExtensionType.renegotiation_info:None}))
ext = {ExtensionType.renegotiation_info: None}
if dhe:
groups = [GroupName.secp256r1,
GroupName.ffdhe2048]
ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\
.create(groups)
ext[ExtensionType.signature_algorithms] = \
SignatureAlgorithmsExtension().create(SIG_ALL)
ext[ExtensionType.signature_algorithms_cert] = \
SignatureAlgorithmsCertExtension().create(SIG_ALL)
ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_DHE_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA]
else:
ciphers = [CipherSuite.TLS_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA]
node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext))
if dhe:
if swap_ciphers:
exp_ciph = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA]
else:
exp_ciph = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_DHE_RSA_WITH_AES_256_CBC_SHA]
else:
if swap_ciphers:
exp_ciph = CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA
else:
exp_ciph = CipherSuite.TLS_RSA_WITH_AES_256_CBC_SHA
node = node.add_child(ExpectServerHello(
cipher=CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA if swap_ciphers else
CipherSuite.TLS_RSA_WITH_AES_256_CBC_SHA,
cipher=exp_ciph,
extensions={ExtensionType.renegotiation_info:None}))
node = node.add_child(ExpectCertificate())
if dhe:
node = node.add_child(ExpectServerKeyExchange())
node = node.add_child(ExpectServerHelloDone())
node = node.add_child(ClientKeyExchangeGenerator())
node = node.add_child(ChangeCipherSpecGenerator())
Expand All @@ -214,13 +330,23 @@ def main():

node = node.add_child(ResetHandshakeHashes())
node = node.add_child(ResetRenegotiationInfo())
if swap_ciphers:
ciphers = [CipherSuite.TLS_RSA_WITH_AES_256_CBC_SHA]
if dhe:
if swap_ciphers:
ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_DHE_RSA_WITH_AES_256_CBC_SHA]
else:
ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA]
else:
ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA]
if swap_ciphers:
ciphers = [CipherSuite.TLS_RSA_WITH_AES_256_CBC_SHA]
else:
ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA]
node = node.add_child(ClientHelloGenerator(
ciphers,
extensions={ExtensionType.renegotiation_info:None}))
extensions=ext))
node = node.add_child(ExpectAlert(AlertLevel.fatal,
AlertDescription.illegal_parameter))
node.add_child(ExpectClose())
Expand All @@ -229,13 +355,27 @@ def main():

conversation = Connect(host, port)
node = conversation
ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA]
node = node.add_child(ClientHelloGenerator(
ciphers,
extensions={ExtensionType.renegotiation_info:None}))
ext = {ExtensionType.renegotiation_info: None}
if dhe:
groups = [GroupName.secp256r1,
GroupName.ffdhe2048]
ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\
.create(groups)
ext[ExtensionType.signature_algorithms] = \
SignatureAlgorithmsExtension().create(SIG_ALL)
ext[ExtensionType.signature_algorithms_cert] = \
SignatureAlgorithmsCertExtension().create(SIG_ALL)
ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA]
else:
ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA]
node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext))
node = node.add_child(ExpectServerHello(
extensions={ExtensionType.renegotiation_info:None}))
node = node.add_child(ExpectCertificate())
if dhe:
node = node.add_child(ExpectServerKeyExchange())
node = node.add_child(ExpectServerHelloDone())
node = node.add_child(ClientKeyExchangeGenerator())
node = node.add_child(ChangeCipherSpecGenerator())
Expand All @@ -254,10 +394,15 @@ def main():

node = node.add_child(ResetHandshakeHashes())
node = node.add_child(ResetRenegotiationInfo())
ciphers = [CipherSuite.TLS_RSA_WITH_NULL_SHA]
if dhe:
ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_NULL_SHA,
CipherSuite.TLS_ECDHE_RSA_WITH_NULL_SHA,
CipherSuite.TLS_ECDH_ANON_WITH_NULL_SHA]
else:
ciphers = [CipherSuite.TLS_RSA_WITH_NULL_SHA]
node = node.add_child(ClientHelloGenerator(
ciphers,
extensions={ExtensionType.renegotiation_info:None}))
extensions=ext))
node = node.add_child(ExpectAlert(AlertLevel.fatal,
AlertDescription.illegal_parameter))
node.add_child(ExpectClose())
Expand Down
36 changes: 36 additions & 0 deletions tests/test_tlsfuzzer_expect.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,42 @@ def test_is_match_with_empty_message(self):

self.assertFalse(ret)

def test__cmp_eq_or_in(self):
ret = ExpectHandshake._cmp_eq_or_in([2, 3, 4], 3)

self.assertIsNone(ret)

def test__cmp_eq_or_in_with_None(self):
ret = ExpectHandshake._cmp_eq_or_in(None, 3)

self.assertIsNone(ret)

def test__cmp_eq_or_in_not_matching(self):
with self.assertRaises(AssertionError) as e:
ExpectHandshake._cmp_eq_or_in([2, 3, 4], 1)

self.assertIn("[2, 3, 4]", str(e.exception))
self.assertIn("not in expected", str(e.exception))
self.assertIn("1", str(e.exception))

def test__cmp_eq_or_in_mismatch_with_type(self):
with self.assertRaises(AssertionError) as e:
ExpectHandshake._cmp_eq_or_in(
[HandshakeType.client_hello,
HandshakeType.server_hello],
HandshakeType.server_key_exchange,
field_type=HandshakeType)

self.assertIn("client_hello, server_hello", str(e.exception))
self.assertIn("server_key_exchange", str(e.exception))

def test__cmp_eq_or_in_mismatch_with_format_string(self):
with self.assertRaises(AssertionError) as e:
ExpectHandshake._cmp_eq_or_in([2, 3], 1,
f_str="our: {0}, ext: {1}")

self.assertIn("our: [2, 3], ext: 1", str(e.exception))

def test__cmp_eq_list_no_type(self):
ret = ExpectHandshake._cmp_eq_list((1, 2), (1, 2))

Expand Down
2 changes: 2 additions & 0 deletions tests/tlslite-ng-random-subset.json
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@
"-k", "tests/serverX509Key.pem",
"-c", "tests/serverX509Cert.pem"]},
{"name" : "test-resumption-with-wrong-ciphers.py"},
{"name" : "test-resumption-with-wrong-ciphers.py",
"arguments" : ["-d"]},
{"name" : "test-serverhello-random.py",
"arguments" : ["-x", "Protocol (3, 0)", "-X", "protocol_version",
"-x", "Protocol (3, 0) in SSLv2 compatible ClientHello",
Expand Down

0 comments on commit 17a6e1b

Please sign in to comment.