Skip to content

Commit

Permalink
Merge pull request #789 from tlsfuzzer/dhe-key-share
Browse files Browse the repository at this point in the history
DHE key share sizes
  • Loading branch information
tomato42 committed May 10, 2022
2 parents dbb0458 + b7c9c35 commit b832247
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 16 deletions.
72 changes: 56 additions & 16 deletions scripts/test-dhe-no-shared-secret-padding.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Author: Hubert Kario, (c) 2018
# Author: Hubert Kario, (c) 2018-2022
# Released under Gnu GPL v2.0, see LICENSE file for details

"""Test for correct handling of short DHE shared secret."""
Expand All @@ -22,11 +22,14 @@
from tlsfuzzer.utils.lists import natural_sort_keys

from tlslite.constants import CipherSuite, AlertLevel, AlertDescription, \
ExtensionType
ExtensionType, GroupName
from tlslite.extensions import SupportedGroupsExtension, \
SignatureAlgorithmsExtension, SignatureAlgorithmsCertExtension
from tlslite.utils.cryptomath import numBytes
from tlsfuzzer.helpers import SIG_ALL


version = 4
version = 6


def help_msg():
Expand All @@ -50,11 +53,13 @@ def help_msg():
print(" shared secret for test case to be valid,")
print(" 1 by default")
print(" -z don't expect 1/n-1 record split in TLS1.0")
print(" --extra-exts Send additional extensions to advertise support for")
print(" stronger primes and signatures")
print(" --help this message")


def main():
"""Verify correct DHE shared secret handling."""
"""Verify correct DHE shared secret and key share handling."""
host = "localhost"
port = 4433
num_limit = 1
Expand All @@ -63,9 +68,11 @@ def main():
last_exp_tmp = None
min_zeros = 1
record_split = True
extra_exts = False

argv = sys.argv[1:]
opts, args = getopt.getopt(argv, "h:p:e:x:X:n:z", ["help", "min-zeros="])
opts, args = getopt.getopt(argv, "h:p:e:x:X:n:z", ["help", "min-zeros=",
"extra-exts"])
for opt, arg in opts:
if opt == '-h':
host = arg
Expand All @@ -84,6 +91,8 @@ def main():
num_limit = int(arg)
elif opt == '-z':
record_split = False
elif opt == '--extra-exts':
extra_exts = True
elif opt == '--help':
help_msg()
sys.exit(0)
Expand All @@ -99,27 +108,40 @@ def main():

collected_premaster_secrets = []
collected_dh_primes = []
collected_client_key_shares = []
variables_check = \
{'premaster_secret':
collected_premaster_secrets,
'ServerKeyExchange.dh_p':
collected_dh_primes}
collected_dh_primes,
'ClientKeyExchange.dh_Yc':
collected_client_key_shares}

conversations = {}

conversation = Connect(host, port)
node = conversation
exts = {}
exts[ExtensionType.renegotiation_info] = None
if extra_exts:
exts[ExtensionType.supported_groups] = SupportedGroupsExtension()\
.create([GroupName.ffdhe2048, GroupName.ffdhe3072,
GroupName.ffdhe4096])
exts[ExtensionType.signature_algorithms_cert] = \
SignatureAlgorithmsCertExtension().create(SIG_ALL)
exts[ExtensionType.signature_algorithms] = \
SignatureAlgorithmsExtension().create(SIG_ALL)
ciphers = [CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA]
node = node.add_child(ClientHelloGenerator(
ciphers,
extensions={ExtensionType.renegotiation_info:None}))
extensions=exts))
node = node.add_child(ExpectServerHello(
extensions={ExtensionType.renegotiation_info:None}))
node = node.add_child(ExpectCertificate())
node = node.add_child(ExpectServerKeyExchange())
node = node.add_child(CopyVariables(variables_check))
node = node.add_child(ExpectServerHelloDone())
node = node.add_child(ClientKeyExchangeGenerator())
node = node.add_child(CopyVariables(variables_check))
node = node.add_child(ChangeCipherSpecGenerator())
node = node.add_child(FinishedGenerator())
node = node.add_child(ExpectChangeCipherSpec())
Expand All @@ -139,9 +161,13 @@ def main():
conversation = Connect(host, port,
version=(0, 2) if ssl2 else (3, 0))
node = conversation
ciphers = [CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
if ssl2:
ciphers = [CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
else:
ciphers = [CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA]
node = node.add_child(ClientHelloGenerator(ciphers,
extensions=exts,
version=prot,
ssl2=ssl2))
if prot > (3, 0):
Expand All @@ -152,9 +178,9 @@ def main():
version=prot))
node = node.add_child(ExpectCertificate())
node = node.add_child(ExpectServerKeyExchange())
node = node.add_child(CopyVariables(variables_check))
node = node.add_child(ExpectServerHelloDone())
node = node.add_child(ClientKeyExchangeGenerator())
node = node.add_child(CopyVariables(variables_check))
node = node.add_child(ChangeCipherSpecGenerator())
node = node.add_child(FinishedGenerator())
node = node.add_child(ExpectChangeCipherSpec())
Expand Down Expand Up @@ -204,15 +230,18 @@ def main():
continue
i = 0
break_loop = False
break_loop_clnt = False
while True:
# don't hog the memory unnecessairly
collected_dh_primes[:] = []
collected_premaster_secrets[:] = []
collected_client_key_shares[:] = []

print("\"{1}\" repeat {0}...".format(i, c_name))
i += 1
if c_name == "sanity":
break_loop = True
break_loop_clnt = True

runner = Runner(c_test)

Expand Down Expand Up @@ -246,25 +275,36 @@ def main():
if res:
good += 1
if numBytes(collected_dh_primes[-1]) \
>= len(collected_premaster_secrets[-1]) + min_zeros:
print("Got prime {0} bytes long and a premaster_secret "
"{1} bytes long"
>= len(collected_premaster_secrets[-1]) \
+ min_zeros:
print("Got prime {0} bytes long and a premaster_secret"
" {1} bytes long"
.format(numBytes(collected_dh_primes[-1]),
len(collected_premaster_secrets[-1])))
break_loop = True
if numBytes(collected_dh_primes[-1]) \
>= numBytes(collected_client_key_shares[-1]) + \
min_zeros:
print("Got prime {0} bytes long and a client "
"key share {1} bytes long"
.format(
numBytes(collected_dh_primes[-1]),
numBytes(collected_client_key_shares[-1])))
break_loop_clnt = True
print("OK\n")
else:
bad += 1
failed.append(c_name)
break
if break_loop:
if break_loop and break_loop_clnt:
break


print('')

print("Check if the calculated DHE pre_master_secret is truncated when")
print("there are zeros on most significant bytes")
print("there are zeros on most significant bytes, and that server")
print("accepts a client key share when it does the same")

print("Test end")
print(20 * '=')
Expand Down
3 changes: 3 additions & 0 deletions tlsfuzzer/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ class CopyVariables(Command):
``master_secret``, ``ServerHello.extensions.key_share.key_exchange``,
``server handshake traffic secret``, ``exporter master secret``,
``ServerKeyExchange.key_share``, ``ServerKeyExchange.dh_p``,
``ClientKeyExchange.dh_Yc``, ``ClientKeyExchange.ecdh_Yc``,
``DH shared secret``, ``PSK secret``, ``client_verify_data``,
``server_verify_data``, ``client application traffic secret``,
``server application traffic secret``,
Expand Down Expand Up @@ -809,12 +810,14 @@ def generate(self, status):
self.version).createDH(ske.dh_p-1)
else:
cke = status.key_exchange.makeClientKeyExchange()
status.key['ClientKeyExchange.dh_Yc'] = cke.dh_Yc
elif self.cipher in CipherSuite.ecdhAllSuites:
if self.ecdh_Yc is not None:
cke = ClientKeyExchange(self.cipher,
self.version).createECDH(self.ecdh_Yc)
else:
cke = status.key_exchange.makeClientKeyExchange()
status.key['ClientKeyExchange.ecdh_Yc'] = cke.ecdh_Yc
else:
raise AssertionError("Unknown cipher/key exchange type")

Expand Down

0 comments on commit b832247

Please sign in to comment.