-
Notifications
You must be signed in to change notification settings - Fork 77
/
messages.py
2371 lines (1974 loc) · 76 KB
/
messages.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Authors:
# Trevor Perrin
# Google - handling CertificateRequest.certificate_types
# Google (adapted by Sam Rushing and Marcelo Fernandez) - NPN support
# Dimitris Moraitis - Anon ciphersuites
# Yngve Pettersen (ported by Paul Sokolovsky) - TLS 1.2
# Hubert Kario - 'extensions' cleanup
#
# See the LICENSE file for legal information regarding use of this file.
"""Classes representing TLS messages."""
from .utils.compat import *
from .utils.cryptomath import *
from .errors import *
from .utils.codec import *
from .constants import *
from .x509 import X509
from .x509certchain import X509CertChain
from .utils.tackwrapper import *
from .utils.deprecations import deprecated_attrs, deprecated_params
from .extensions import *
from .utils.format_output import none_as_unknown
class RecordHeader(object):
"""Generic interface to SSLv2 and SSLv3 (and later) record headers."""
def __init__(self, ssl2):
"""Define instance variables."""
self.type = 0
self.version = (0, 0)
self.length = 0
self.ssl2 = ssl2
class RecordHeader3(RecordHeader):
"""SSLv3 (and later) TLS record header."""
def __init__(self):
"""Define a SSLv3 style class."""
super(RecordHeader3, self).__init__(ssl2=False)
def create(self, version, type, length):
"""Set object values for writing (serialisation)."""
self.type = type
self.version = version
self.length = length
return self
def write(self):
"""Serialise object to bytearray."""
writer = Writer()
writer.add(self.type, 1)
writer.add(self.version[0], 1)
writer.add(self.version[1], 1)
writer.add(self.length, 2)
return writer.bytes
def parse(self, parser):
"""Deserialise object from Parser."""
self.type = parser.get(1)
self.version = (parser.get(1), parser.get(1))
self.length = parser.get(2)
self.ssl2 = False
return self
@property
def typeName(self):
matching = [x[0] for x in ContentType.__dict__.items()
if x[1] == self.type]
if len(matching) == 0:
return "unknown(" + str(self.type) + ")"
else:
return str(matching[0])
def __str__(self):
return "SSLv3 record,version({0[0]}.{0[1]}),"\
"content type({1}),length({2})".format(self.version,
self.typeName,
self.length)
def __repr__(self):
return "RecordHeader3(type={0}, version=({1[0]}.{1[1]}), length={2})".\
format(self.type, self.version, self.length)
class RecordHeader2(RecordHeader):
"""
SSLv2 record header.
:vartype padding: int
:ivar padding: number of bytes added at end of message to make it multiple
of block cipher size
:vartype securityEscape: boolean
:ivar securityEscape: whether the record contains a security escape message
"""
def __init__(self):
"""Define a SSLv2 style class."""
super(RecordHeader2, self).__init__(ssl2=True)
self.padding = 0
self.securityEscape = False
def parse(self, parser):
"""Deserialise object from Parser."""
firstByte = parser.get(1)
secondByte = parser.get(1)
if firstByte & 0x80:
self.length = ((firstByte & 0x7f) << 8) | secondByte
else:
self.length = ((firstByte & 0x3f) << 8) | secondByte
self.securityEscape = firstByte & 0x40 != 0
self.padding = parser.get(1)
self.type = ContentType.handshake
self.version = (2, 0)
return self
def create(self, length, padding=0, securityEscape=False):
"""Set object's values."""
self.length = length
self.padding = padding
self.securityEscape = securityEscape
return self
def write(self):
"""Serialise object to bytearray."""
writer = Writer()
shortHeader = not (self.padding or self.securityEscape)
if ((shortHeader and self.length >= 0x8000) or
(not shortHeader and self.length >= 0x4000)):
raise ValueError("length too large")
firstByte = 0
if shortHeader:
firstByte |= 0x80
if self.securityEscape:
firstByte |= 0x40
firstByte |= self.length >> 8
secondByte = self.length & 0xff
writer.add(firstByte, 1)
writer.add(secondByte, 1)
if not shortHeader:
writer.add(self.padding, 1)
return writer.bytes
class Message(object):
"""Generic TLS message."""
def __init__(self, contentType, data):
"""
Initialize object with specified contentType and data.
:type contentType: int
:param contentType: TLS record layer content type of associated data
:type data: bytearray
:param data: data
"""
self.contentType = contentType
self.data = data
def write(self):
"""Return serialised object data."""
return self.data
class Alert(object):
def __init__(self):
self.contentType = ContentType.alert
self.level = 0
self.description = 0
def create(self, description, level=AlertLevel.fatal):
self.level = level
self.description = description
return self
def parse(self, p):
p.setLengthCheck(2)
self.level = p.get(1)
self.description = p.get(1)
p.stopLengthCheck()
return self
def write(self):
w = Writer()
w.add(self.level, 1)
w.add(self.description, 1)
return w.bytes
@property
def levelName(self):
return none_as_unknown(AlertLevel.toRepr(self.level),
self.level)
@property
def descriptionName(self):
return none_as_unknown(AlertDescription.toRepr(self.description),
self.description)
def __str__(self):
return "Alert, level:{0}, description:{1}".format(self.levelName,
self.descriptionName)
def __repr__(self):
return "Alert(level={0}, description={1})".format(self.level,
self.description)
class HandshakeMsg(object):
def __init__(self, handshakeType):
self.contentType = ContentType.handshake
self.handshakeType = handshakeType
def __eq__(self, other):
"""Check if other object represents the same data as this object."""
if hasattr(self, "write") and hasattr(other, "write"):
return self.write() == other.write()
else:
return False
def __ne__(self, other):
"""Check if other object represents different data as this object."""
return not self.__eq__(other)
def postWrite(self, w):
headerWriter = Writer()
headerWriter.add(self.handshakeType, 1)
headerWriter.add(len(w.bytes), 3)
return headerWriter.bytes + w.bytes
class HelloMessage(HandshakeMsg):
"""
Class for sharing code between :py:class:`ClientHello` and
:py:class:`ServerHello`.
"""
def __init__(self, *args, **kwargs):
"""Initialize object."""
super(HelloMessage, self).__init__(*args, **kwargs)
self.extensions = None
def getExtension(self, extType):
"""
Return extension of given type if present, None otherwise.
:rtype: ~tlslite.extensions.TLSExtension
:raises TLSInternalError: when there are multiple extensions of the
same type
"""
if self.extensions is None:
return None
exts = [ext for ext in self.extensions if ext.extType == extType]
if len(exts) > 1:
raise TLSInternalError(
"Multiple extensions of the same type present")
elif len(exts) == 1:
return exts[0]
else:
return None
def addExtension(self, ext):
"""
Add extension to internal list of extensions.
:type ext: TLSExtension
:param ext: extension object to add to list
"""
if self.extensions is None:
self.extensions = []
self.extensions.append(ext)
def _addExt(self, extType):
"""Add en empty extension of given type, if not already present"""
ext = self.getExtension(extType)
if ext is None:
ext = TLSExtension(extType=extType).create(bytearray(0))
self.addExtension(ext)
def _removeExt(self, extType):
"""Remove extension of given type"""
if self.extensions is not None:
self.extensions[:] = (i for i in self.extensions
if i.extType != extType)
def _addOrRemoveExt(self, extType, add):
"""
Remove or add an empty extension of given type.
:type extType: int
:param extType: numeric id of extension to add or remove
:type add: boolean
:param add: whether to add (True) or remove (False) the extension
"""
if add:
self._addExt(extType)
else:
self._removeExt(extType)
class ClientHello(HelloMessage):
"""
Class for handling the ClientHello SSLv2/SSLv3/TLS message.
:vartype certificate_types: list
:ivar certificate_types: list of supported certificate types
(deprecated)
:vartype srp_username: bytearray
:ivar srp_username: name of the user in SRP extension (deprecated)
:vartype ~.supports_npn: boolean
:ivar ~.supports_npn: NPN extension presence (deprecated)
:vartype ~.tack: boolean
:ivar ~.tack: TACK extension presence (deprecated)
:vartype ~.server_name: bytearray
:ivar ~.server_name: first host_name (type 0) present in SNI extension
(deprecated)
:vartype extensions: list of :py:class:`TLSExtension`
:ivar extensions: list of TLS extensions parsed from wire or to send, see
:py:class:`TLSExtension` and child classes for exact examples
"""
def __init__(self, ssl2=False):
super(ClientHello, self).__init__(HandshakeType.client_hello)
self.ssl2 = ssl2
self.client_version = (0, 0)
self.random = bytearray(32)
self.session_id = bytearray(0)
self.cipher_suites = [] # a list of 16-bit values
self.compression_methods = [] # a list of 8-bit values
def __str__(self):
"""
Return human readable representation of Client Hello.
:rtype: str
"""
if self.session_id.count(bytearray(b'\x00')) == len(self.session_id)\
and len(self.session_id) != 0:
session = "bytearray(b'\\x00'*{0})".format(len(self.session_id))
else:
session = repr(self.session_id)
ret = "client_hello,version({0[0]}.{0[1]}),random(...),"\
"session ID({1!s}),cipher suites({2!r}),"\
"compression methods({3!r})".format(
self.client_version, session,
self.cipher_suites, self.compression_methods)
if self.extensions is not None:
ret += ",extensions({0!r})".format(self.extensions)
return ret
def __repr__(self):
"""
Return machine readable representation of Client Hello.
:rtype: str
"""
return "ClientHello(ssl2={0}, client_version=({1[0]}.{1[1]}), "\
"random={2!r}, session_id={3!r}, cipher_suites={4!r}, "\
"compression_methods={5}, extensions={6})".format(
self.ssl2, self.client_version, self.random,
self.session_id, self.cipher_suites,
self.compression_methods, self.extensions)
@property
def certificate_types(self):
"""
Return the list of certificate types supported.
.. deprecated:: 0.5
use extensions field to get the extension for inspection
"""
cert_type = self.getExtension(ExtensionType.cert_type)
if cert_type is None:
# XXX backwards compatibility: TLSConnection
# depends on a default value of this property
return [CertificateType.x509]
else:
return cert_type.certTypes
@certificate_types.setter
def certificate_types(self, val):
"""
Set list of supported certificate types.
Sets the list of supported types to list given in :py:obj:`val` if the
cert_type extension is present. Creates the extension and places it
last in the list otherwise.
:type val: list
:param val: list of supported certificate types by client encoded as
single byte integers
"""
cert_type = self.getExtension(ExtensionType.cert_type)
if cert_type is None:
ext = ClientCertTypeExtension().create(val)
self.addExtension(ext)
else:
cert_type.certTypes = val
@property
def srp_username(self):
"""
Return username for the SRP.
.. deprecated:: 0.5
use extensions field to get the extension for inspection
"""
srp_ext = self.getExtension(ExtensionType.srp)
if srp_ext is None:
return None
else:
return srp_ext.identity
@srp_username.setter
def srp_username(self, name):
"""
Set the username for SRP.
:type name: bytearray
:param name: UTF-8 encoded username
"""
srp_ext = self.getExtension(ExtensionType.srp)
if srp_ext is None:
ext = SRPExtension().create(name)
self.addExtension(ext)
else:
srp_ext.identity = name
@property
def tack(self):
"""
Return whether the client supports TACK.
.. deprecated:: 0.5
use extensions field to get the extension for inspection
:rtype: boolean
"""
return self.getExtension(ExtensionType.tack) is not None
@tack.setter
def tack(self, present):
"""
Create or delete the TACK extension.
:type present: boolean
:param present: True will create extension while False will remove
extension from client hello
"""
self._addOrRemoveExt(ExtensionType.tack, present)
@property
def supports_npn(self):
"""
Return whether client supports NPN extension.
.. deprecated:: 0.5
use extensions field to get the extension for inspection
:rtype: boolean
"""
return self.getExtension(ExtensionType.supports_npn) is not None
@supports_npn.setter
def supports_npn(self, present):
"""
Create or delete the NPN extension.
:type present: boolean
:param present: selects whatever to create or remove the extension
from list of supported ones
"""
self._addOrRemoveExt(ExtensionType.supports_npn, present)
@property
def server_name(self):
"""
Return first host_name present in SNI extension.
.. deprecated:: 0.5
use extensions field to get the extension for inspection
:rtype: bytearray
"""
sni_ext = self.getExtension(ExtensionType.server_name)
if sni_ext is None:
return bytearray(0)
else:
if len(sni_ext.hostNames) > 0:
return sni_ext.hostNames[0]
else:
return bytearray(0)
@server_name.setter
def server_name(self, hostname):
"""
Set the first host_name present in SNI extension.
:type hostname: bytearray
:param hostname: name of the host_name to set
"""
sni_ext = self.getExtension(ExtensionType.server_name)
if sni_ext is None:
sni_ext = SNIExtension().create(hostname)
self.addExtension(sni_ext)
else:
names = list(sni_ext.hostNames)
names[0] = hostname
sni_ext.hostNames = names
def create(self, version, random, session_id, cipher_suites,
certificate_types=None, srpUsername=None,
tack=False, supports_npn=None, serverName=None,
extensions=None):
"""
Create a ClientHello message for sending.
:type version: tuple
:param version: the highest supported TLS version encoded as two int
tuple
:type random: bytearray
:param random: client provided random value, in old versions of TLS
(before 1.2) the first 32 bits should include system time, also
used as the "challenge" field in SSLv2
:type session_id: bytearray
:param session_id: ID of session, set when doing session resumption
:type cipher_suites: list
:param cipher_suites: list of ciphersuites advertised as supported
:type certificate_types: list
:param certificate_types: list of supported certificate types, uses
TLS extension for signalling, as such requires TLS1.0 to work
:type srpUsername: bytearray
:param srpUsername: utf-8 encoded username for SRP, TLS extension
:type tack: boolean
:param tack: whatever to advertise support for TACK, TLS extension
:type supports_npn: boolean
:param supports_npn: whatever to advertise support for NPN, TLS
extension
:type serverName: bytearray
:param serverName: the hostname to request in server name indication
extension, TLS extension. Note that SNI allows to set multiple
hostnames and values that are not hostnames, use
:py:class:`~.extensions.SNIExtension`
together with :py:obj:`extensions` to use it.
:type extensions: list of :py:class:`~.extensions.TLSExtension`
:param extensions: list of extensions to advertise
"""
self.client_version = version
self.random = random
self.session_id = session_id
self.cipher_suites = cipher_suites
self.compression_methods = [0]
if extensions is not None:
self.extensions = extensions
if certificate_types is not None:
self.certificate_types = certificate_types
if srpUsername is not None:
if not isinstance(srpUsername, bytearray):
raise TypeError("srpUsername must be a bytearray object")
self.srp_username = srpUsername
self.tack = tack
if supports_npn is not None:
self.supports_npn = supports_npn
if serverName is not None:
self.server_name = bytearray(serverName, "utf-8")
return self
def parse(self, p):
"""Deserialise object from on the wire data."""
if self.ssl2:
self.client_version = (p.get(1), p.get(1))
cipherSpecsLength = p.get(2)
sessionIDLength = p.get(2)
randomLength = p.get(2)
p.setLengthCheck(cipherSpecsLength +
sessionIDLength +
randomLength)
self.cipher_suites = p.getFixList(3, cipherSpecsLength//3)
self.session_id = p.getFixBytes(sessionIDLength)
self.random = p.getFixBytes(randomLength)
if len(self.random) < 32:
zeroBytes = 32-len(self.random)
self.random = bytearray(zeroBytes) + self.random
self.compression_methods = [0] # Fake this value
p.stopLengthCheck()
else:
p.startLengthCheck(3)
self.client_version = (p.get(1), p.get(1))
self.random = p.getFixBytes(32)
self.session_id = p.getVarBytes(1)
self.cipher_suites = p.getVarList(2, 2)
self.compression_methods = p.getVarList(1, 1)
if not p.atLengthCheck():
self.extensions = []
totalExtLength = p.get(2)
p2 = Parser(p.getFixBytes(totalExtLength))
while p2.getRemainingLength() > 0:
ext = TLSExtension().parse(p2)
self.extensions += [ext]
p.stopLengthCheck()
return self
def _writeSSL2(self):
"""Serialise SSLv2 object to on the wire data."""
writer = Writer()
writer.add(self.handshakeType, 1)
writer.add(self.client_version[0], 1)
writer.add(self.client_version[1], 1)
ciphersWriter = Writer()
ciphersWriter.addFixSeq(self.cipher_suites, 3)
writer.add(len(ciphersWriter.bytes), 2)
writer.add(len(self.session_id), 2)
writer.add(len(self.random), 2)
writer.bytes += ciphersWriter.bytes
writer.bytes += self.session_id
writer.bytes += self.random
# postWrite() is necessary only for SSLv3/TLS
return writer.bytes
def _write(self):
"""Serialise SSLv3 or TLS object to on the wire data."""
w = Writer()
w.add(self.client_version[0], 1)
w.add(self.client_version[1], 1)
w.bytes += self.random
w.addVarSeq(self.session_id, 1, 1)
w.addVarSeq(self.cipher_suites, 2, 2)
w.addVarSeq(self.compression_methods, 1, 1)
if self.extensions is not None:
w2 = Writer()
for ext in self.extensions:
w2.bytes += ext.write()
w.add(len(w2.bytes), 2)
w.bytes += w2.bytes
return self.postWrite(w)
def psk_truncate(self):
"""Return a truncated encoding of message without binders.
In TLS 1.3, with PSK exchange, the ClientHello message is signed
by the binders in it. Return the part that is symmetrically signed
by those binders.
See "PSK Binder" in draft-ietf-tls-tls13-23.
:rtype: bytearray
"""
ext = self.extensions[-1]
if not isinstance(ext, PreSharedKeyExtension):
raise ValueError("Last extension must be the pre_shared_key "
"extension")
bts = self.write()
# every binder has 1 byte long header and the list of them
# has a 2 byte header
length = sum(len(i) + 1 for i in ext.binders) + 2
return bts[:-length]
def write(self):
"""Serialise object to on the wire data."""
if self.ssl2:
return self._writeSSL2()
else:
return self._write()
class HelloRequest(HandshakeMsg):
"""
Handling of Hello Request messages.
"""
def __init__(self):
super(HelloRequest, self).__init__(HandshakeType.hello_request)
def create(self):
return self
def write(self):
return self.postWrite(Writer())
def parse(self, parser):
# verify that the message is empty (the buffer will just contain
# the length from header)
parser.startLengthCheck(3)
parser.stopLengthCheck()
return self
class ServerHello(HelloMessage):
"""
Handling of Server Hello messages.
:vartype server_version: tuple
:ivar server_version: protocol version encoded as two int tuple
:vartype random: bytearray
:ivar random: server random value
:vartype session_id: bytearray
:ivar session_id: session identifier for resumption
:vartype cipher_suite: int
:ivar cipher_suite: server selected cipher_suite
:vartype compression_method: int
:ivar compression_method: server selected compression method
:vartype next_protos: list of bytearray
:ivar next_protos: list of advertised protocols in NPN extension
:vartype next_protos_advertised: list of bytearray
:ivar next_protos_advertised: list of protocols advertised in NPN extension
:vartype certificate_type: int
:ivar certificate_type: certificate type selected by server
:vartype extensions: list
:ivar extensions: list of TLS extensions present in server_hello message,
see :py:class:`~.extensions.TLSExtension` and child classes for exact
examples
"""
def __init__(self):
"""Initialise ServerHello object."""
super(ServerHello, self).__init__(HandshakeType.server_hello)
self.server_version = (0, 0)
self.random = bytearray(32)
self.session_id = bytearray(0)
self.cipher_suite = 0
self.compression_method = 0
self._tack_ext = None
def __str__(self):
base = "server_hello,length({0}),version({1[0]}.{1[1]}),random(...),"\
"session ID({2!r}),cipher({3:#x}),compression method({4})"\
.format(len(self.write())-4, self.server_version,
self.session_id, self.cipher_suite,
self.compression_method)
if self.extensions is None:
return base
ret = ",extensions["
ret += ",".join(repr(x) for x in self.extensions)
ret += "]"
return base + ret
def __repr__(self):
return "ServerHello(server_version=({0[0]}, {0[1]}), random={1!r}, "\
"session_id={2!r}, cipher_suite={3}, compression_method={4}, "\
"_tack_ext={5}, extensions={6!r})".format(
self.server_version, self.random, self.session_id,
self.cipher_suite, self.compression_method, self._tack_ext,
self.extensions)
@property
def tackExt(self):
"""Return the TACK extension."""
if self._tack_ext is None:
ext = self.getExtension(ExtensionType.tack)
if ext is None or not tackpyLoaded:
return None
else:
self._tack_ext = TackExtension(ext.extData)
return self._tack_ext
@tackExt.setter
def tackExt(self, val):
"""Set the TACK extension."""
self._tack_ext = val
# makes sure that extensions are included in the on the wire encoding
if val is not None:
if self.extensions is None:
self.extensions = []
@property
def certificate_type(self):
"""
Return the certificate type selected by server.
:rtype: int
"""
cert_type = self.getExtension(ExtensionType.cert_type)
if cert_type is None:
# XXX backwards compatibility, TLSConnection expects the default
# value to be that
return CertificateType.x509
return cert_type.cert_type
@certificate_type.setter
def certificate_type(self, val):
"""
Set the certificate type supported.
:type val: int
:param val: type of certificate
"""
if val == CertificateType.x509 or val is None:
# XXX backwards compatibility, x509 value should not be sent
self._removeExt(ExtensionType.cert_type)
return
cert_type = self.getExtension(ExtensionType.cert_type)
if cert_type is None:
ext = ServerCertTypeExtension().create(val)
self.addExtension(ext)
else:
cert_type.cert_type = val
@property
def next_protos(self):
"""
Return the advertised protocols in NPN extension.
:rtype: list of bytearrays
"""
npn_ext = self.getExtension(ExtensionType.supports_npn)
if npn_ext is None:
return None
else:
return npn_ext.protocols
@next_protos.setter
def next_protos(self, val):
"""
Set the advertised protocols in NPN extension.
:type val: list
:param val: list of protocols to advertise as UTF-8 encoded names
"""
if val is None:
# XXX: do not send empty extension
self._removeExt(ExtensionType.supports_npn)
return
else:
# convinience function, make sure the values are properly encoded
val = [bytearray(x) for x in val]
npn_ext = self.getExtension(ExtensionType.supports_npn)
if npn_ext is None:
ext = NPNExtension().create(val)
self.addExtension(ext)
else:
npn_ext.protocols = val
@property
def next_protos_advertised(self):
"""
Return the advertised protocols in NPN extension.
:rtype: list of bytearrays
"""
return self.next_protos
@next_protos_advertised.setter
def next_protos_advertised(self, val):
"""
Set the advertised protocols in NPN extension.
:type val: list
:param val: list of protocols to advertise as UTF-8 encoded names
"""
self.next_protos = val
def create(self, version, random, session_id, cipher_suite,
certificate_type=None, tackExt=None,
next_protos_advertised=None,
extensions=None):
"""Initialize the object for deserialisation."""
self.extensions = extensions
self.server_version = version
self.random = random
self.session_id = session_id
self.cipher_suite = cipher_suite
self.certificate_type = certificate_type
self.compression_method = 0
if tackExt is not None:
self.tackExt = tackExt
self.next_protos_advertised = next_protos_advertised
return self
def parse(self, p):
p.startLengthCheck(3)
self.server_version = (p.get(1), p.get(1))
self.random = p.getFixBytes(32)
self.session_id = p.getVarBytes(1)
self.cipher_suite = p.get(2)
self.compression_method = p.get(1)
if not p.atLengthCheck():
self.extensions = []
totalExtLength = p.get(2)
p2 = Parser(p.getFixBytes(totalExtLength))
while p2.getRemainingLength() > 0:
if self.random == TLS_1_3_HRR:
ext = TLSExtension(hrr=True).parse(p2)
else:
ext = TLSExtension(server=True).parse(p2)
self.extensions += [ext]
p.stopLengthCheck()
return self
def write(self):
w = Writer()
w.add(self.server_version[0], 1)
w.add(self.server_version[1], 1)
w.bytes += self.random
w.addVarSeq(self.session_id, 1, 1)
w.add(self.cipher_suite, 2)
w.add(self.compression_method, 1)
if self.extensions is not None:
w2 = Writer()
for ext in self.extensions:
w2.bytes += ext.write()
if self.tackExt:
b = self.tackExt.serialize()
w2.add(ExtensionType.tack, 2)
w2.add(len(b), 2)
w2.bytes += b
w.add(len(w2.bytes), 2)
w.bytes += w2.bytes
return self.postWrite(w)
class ServerHello2(HandshakeMsg):
"""
SERVER-HELLO message from SSLv2.
:vartype session_id_hit: int
:ivar session_id_hit: non zero if the client provided session ID was
matched in server's session cache
:vartype certificate_type: int
:ivar certificate_type: type of certificate sent
:vartype server_version: tuple of ints
:ivar server_version: protocol version selected by server
:vartype certificate: bytearray
:ivar certificate: certificate sent by server
:vartype ciphers: array of int
:ivar ciphers: list of ciphers supported by server
:vartype session_id: bytearray
:ivar session_id: idendifier of negotiated session
"""
def __init__(self):
super(ServerHello2, self).__init__(SSL2HandshakeType.server_hello)
self.session_id_hit = 0
self.certificate_type = 0
self.server_version = (0, 0)
self.certificate = bytearray(0)
self.ciphers = []
self.session_id = bytearray(0)
def create(self, session_id_hit, certificate_type, server_version,
certificate, ciphers, session_id):
"""Initialize fields of the SERVER-HELLO message."""
self.session_id_hit = session_id_hit