forked from tlsfuzzer/tlsfuzzer
/
test-dhe-no-shared-secret-padding.py
236 lines (205 loc) · 8.74 KB
/
test-dhe-no-shared-secret-padding.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# Author: Hubert Kario, (c) 2018
# Released under Gnu GPL v2.0, see LICENSE file for details
"""Test for correct handling of short DHE shared secret."""
from __future__ import print_function
import traceback
import sys
import getopt
from itertools import chain
from random import sample
from tlsfuzzer.runner import Runner
from tlsfuzzer.messages import Connect, ClientHelloGenerator, \
ClientKeyExchangeGenerator, ChangeCipherSpecGenerator, \
FinishedGenerator, ApplicationDataGenerator, AlertGenerator, \
CopyVariables
from tlsfuzzer.expect import ExpectServerHello, ExpectCertificate, \
ExpectServerHelloDone, ExpectChangeCipherSpec, ExpectFinished, \
ExpectAlert, ExpectClose, ExpectServerKeyExchange, \
ExpectApplicationData
from tlsfuzzer.utils.lists import natural_sort_keys
from tlslite.constants import CipherSuite, AlertLevel, AlertDescription, \
ExtensionType
from tlslite.utils.cryptomath import numBytes
version = 1
def help_msg():
print("Usage: <script-name> [-h hostname] [-p port] [[probe-name] ...]")
print(" -h hostname name of the host to run the test against")
print(" localhost by default")
print(" -p port port number to use for connection, 4433 by default")
print(" probe-name if present, will run only the probes with given")
print(" names and not all of them, e.g \"sanity\"")
print(" -e probe-name exclude the probe from the list of the ones run")
print(" may be specified multiple times")
print(" -n num only run `num` random tests instead of a full set")
print(" (excluding \"sanity\" tests)")
print(" --min-zeros m minimal number of zeros that have to be cut from")
print(" shared secret for test case to be valid,")
print(" 1 by default")
print(" -z disable 1/n-1 record split for TLS1.0")
print(" --help this message")
def main():
"""Verify correct DHE shared secret handling."""
host = "localhost"
port = 4433
num_limit = None
run_exclude = set()
min_zeros = 1
record_split = True
argv = sys.argv[1:]
opts, args = getopt.getopt(argv, "h:p:e:n:z", ["help", "min-zeros="])
for opt, arg in opts:
if opt == '-h':
host = arg
elif opt == '-p':
port = int(arg)
elif opt == '-e':
run_exclude.add(arg)
elif opt == '-n':
num_limit = int(arg)
elif opt == '-z':
record_split = False
elif opt == '--help':
help_msg()
sys.exit(0)
elif opt == '--min-zeros':
min_zeros = int(arg)
else:
raise ValueError("Unknown option: {0}".format(opt))
if args:
run_only = set(args)
else:
run_only = None
collected_premaster_secrets = []
collected_dh_primes = []
variables_check = \
{'premaster_secret':
collected_premaster_secrets,
'ServerKeyExchange.dh_p':
collected_dh_primes}
conversations = {}
conversation = Connect(host, port)
node = conversation
ciphers = [CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA]
node = node.add_child(ClientHelloGenerator(
ciphers,
extensions={ExtensionType.renegotiation_info:None}))
node = node.add_child(ExpectServerHello(
extensions={ExtensionType.renegotiation_info:None}))
node = node.add_child(ExpectCertificate())
node = node.add_child(ExpectServerKeyExchange())
node = node.add_child(CopyVariables(variables_check))
node = node.add_child(ExpectServerHelloDone())
node = node.add_child(ClientKeyExchangeGenerator())
node = node.add_child(ChangeCipherSpecGenerator())
node = node.add_child(FinishedGenerator())
node = node.add_child(ExpectChangeCipherSpec())
node = node.add_child(ExpectFinished())
node = node.add_child(ApplicationDataGenerator(
bytearray(b"GET / HTTP/1.0\n\n")))
node = node.add_child(ExpectApplicationData())
node = node.add_child(AlertGenerator(AlertLevel.warning,
AlertDescription.close_notify))
node = node.add_child(ExpectAlert())
node.next_sibling = ExpectClose()
conversations["sanity"] = conversation
for prot in [(3, 0), (3, 1), (3, 2), (3, 3)]:
for ssl2 in [True, False]:
conversation = Connect(host, port,
version=(0, 2) if ssl2 else (3, 0))
node = conversation
ciphers = [CipherSuite.TLS_DHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
node = node.add_child(ClientHelloGenerator(ciphers,
version=prot,
ssl2=ssl2))
if prot > (3, 0):
ext = {ExtensionType.renegotiation_info: None}
else:
ext = None
node = node.add_child(ExpectServerHello(extensions=ext,
version=prot))
node = node.add_child(ExpectCertificate())
node = node.add_child(ExpectServerKeyExchange())
node = node.add_child(CopyVariables(variables_check))
node = node.add_child(ExpectServerHelloDone())
node = node.add_child(ClientKeyExchangeGenerator())
node = node.add_child(ChangeCipherSpecGenerator())
node = node.add_child(FinishedGenerator())
node = node.add_child(ExpectChangeCipherSpec())
node = node.add_child(ExpectFinished())
node = node.add_child(ApplicationDataGenerator(
bytearray(b"GET / HTTP/1.0\n\n")))
node = node.add_child(ExpectApplicationData())
if prot < (3, 2) and record_split:
# 1/n-1 record splitting
node = node.add_child(ExpectApplicationData())
node = node.add_child(AlertGenerator(AlertLevel.warning,
AlertDescription.close_notify))
node = node.add_child(ExpectAlert())
node.next_sibling = ExpectClose()
conversations["Protocol {0}{1}".format(
prot,
" in SSLv2 compatible ClientHello" if ssl2 else "")] = \
conversation
# run the conversation
good = 0
bad = 0
failed = []
if not num_limit:
num_limit = len(conversations)
# make sure that sanity test is run first and last
# to verify that server was running and kept running throughout
sanity_tests = [('sanity', conversations['sanity'])]
regular_tests = [(k, v) for k, v in conversations.items() if k != 'sanity']
sampled_tests = sample(regular_tests, min(num_limit, len(regular_tests)))
ordered_tests = chain(sanity_tests, sampled_tests, sanity_tests)
for c_name, c_test in ordered_tests:
if run_only and c_name not in run_only or c_name in run_exclude:
continue
i = 0
break_loop = False
while True:
# don't hog the memory unnecessairly
collected_dh_primes[:] = []
collected_premaster_secrets[:] = []
print("\"{1}\" repeat {0}...".format(i, c_name))
i += 1
if c_name == "sanity":
break_loop = True
runner = Runner(c_test)
res = True
try:
runner.run()
except Exception:
print("Error while processing")
print(traceback.format_exc())
res = False
if res:
good += 1
if numBytes(collected_dh_primes[-1]) \
>= len(collected_premaster_secrets[-1]) + min_zeros:
print("Got prime {0} bytes long and a premaster_secret "
"{1} bytes long"
.format(numBytes(collected_dh_primes[-1]),
len(collected_premaster_secrets[-1])))
break_loop = True
print("OK\n")
else:
bad += 1
failed.append(c_name)
break
if break_loop:
break
print('')
print("Check if the calculated DHE pre_master_secret is truncated when")
print("there are zeros on most significant bytes")
print("version: {0}\n".format(version))
print("Test end")
print("successful: {0}".format(good))
print("failed: {0}".format(bad))
failed_sorted = sorted(failed, key=natural_sort_keys)
print(" {0}".format('\n '.join(repr(i) for i in failed_sorted)))
if bad > 0:
sys.exit(1)
if __name__ == "__main__":
main()