/
record_tls13.py
223 lines (188 loc) · 8.42 KB
/
record_tls13.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# SPDX-License-Identifier: GPL-2.0-only
# This file is part of Scapy
# See https://scapy.net/ for more information
# Copyright (C) 2017 Maxence Tury
# 2019 Romain Perez
"""
Common TLS 1.3 fields & bindings.
This module covers the record layer, along with the ChangeCipherSpec, Alert and
ApplicationData submessages. For the Handshake type, see tls_handshake.py.
See the TLS class documentation for more information.
"""
import struct
from scapy.error import log_runtime, warning
from scapy.compat import raw, orb
from scapy.fields import ByteEnumField, PacketField, XStrField
from scapy.layers.tls.session import _GenericTLSSessionInheritance
from scapy.layers.tls.basefields import _TLSVersionField, _tls_version, \
_TLSMACField, _TLSLengthField, _tls_type
from scapy.layers.tls.record import _TLSMsgListField, TLS
from scapy.layers.tls.crypto.cipher_aead import AEADTagError
from scapy.layers.tls.crypto.cipher_stream import Cipher_NULL
from scapy.layers.tls.crypto.common import CipherError
from scapy.layers.tls.crypto.pkcs1 import pkcs_i2osp
###############################################################################
# TLS Record Protocol #
###############################################################################
class TLSInnerPlaintext(_GenericTLSSessionInheritance):
name = "TLS Inner Plaintext"
fields_desc = [_TLSMsgListField("msg", []),
ByteEnumField("type", None, _tls_type),
XStrField("pad", "")]
def pre_dissect(self, s):
"""
We need to parse the padding and type as soon as possible,
else we won't be able to parse the message list...
"""
if len(s) < 1:
raise Exception("Invalid InnerPlaintext (too short).")
tmp_len = len(s) - 1
if s[-1] != b"\x00":
msg_len = tmp_len
else:
n = 1
while s[-n] != b"\x00" and n < tmp_len:
n += 1
msg_len = tmp_len - n
self.fields_desc[0].length_from = lambda pkt: msg_len
self.type = struct.unpack("B", s[msg_len:msg_len + 1])[0]
return s
class _TLSInnerPlaintextField(PacketField):
def __init__(self, name, default, *args, **kargs):
super(_TLSInnerPlaintextField, self).__init__(name,
default,
TLSInnerPlaintext)
def m2i(self, pkt, m):
return self.cls(m, tls_session=pkt.tls_session)
def getfield(self, pkt, s):
tag_len = pkt.tls_session.rcs.mac_len
frag_len = pkt.len - tag_len
if frag_len < 1:
warning("InnerPlaintext should at least contain a byte type!")
return s, None
remain, i = super(_TLSInnerPlaintextField, self).getfield(pkt, s[:frag_len]) # noqa: E501
# remain should be empty here
return remain + s[frag_len:], i
def i2m(self, pkt, p):
if isinstance(p, _GenericTLSSessionInheritance):
p.tls_session = pkt.tls_session
if not pkt.tls_session.frozen:
return p.raw_stateful()
return raw(p)
class TLS13(_GenericTLSSessionInheritance):
__slots__ = ["deciphered_len"]
name = "TLS 1.3"
fields_desc = [ByteEnumField("type", 0x17, _tls_type),
_TLSVersionField("version", 0x0303, _tls_version),
_TLSLengthField("len", None),
_TLSInnerPlaintextField("inner", TLSInnerPlaintext()),
_TLSMACField("auth_tag", None)]
def __init__(self, *args, **kargs):
self.deciphered_len = kargs.get("deciphered_len", None)
super(TLS13, self).__init__(*args, **kargs)
# Parsing methods
def _tls_auth_decrypt(self, s):
"""
Provided with the record header and AEAD-ciphered data, return the
sliced and clear tuple (TLSInnerPlaintext, tag). Note that
we still return the slicing of the original input in case of decryption
failure. Also, if the integrity check fails, a warning will be issued,
but we still return the sliced (unauthenticated) plaintext.
"""
rcs = self.tls_session.rcs
read_seq_num = struct.pack("!Q", rcs.seq_num)
rcs.seq_num += 1
add_data = (pkcs_i2osp(self.type, 1) +
pkcs_i2osp(self.version, 2) +
pkcs_i2osp(len(s), 2))
try:
return rcs.cipher.auth_decrypt(add_data, s, read_seq_num)
except CipherError as e:
return e.args
except AEADTagError as e:
pkt_info = self.firstlayer().summary()
log_runtime.info("TLS 1.3: record integrity check failed [%s]", pkt_info) # noqa: E501
return e.args
def pre_dissect(self, s):
"""
Decrypt, verify and decompress the message.
"""
# We commit the pending read state if it has been triggered.
if self.tls_session.triggered_prcs_commit:
if self.tls_session.prcs is not None:
self.tls_session.rcs = self.tls_session.prcs
self.tls_session.prcs = None
self.tls_session.triggered_prcs_commit = False
if len(s) < 5:
raise Exception("Invalid record: header is too short.")
self.type = orb(s[0])
if (isinstance(self.tls_session.rcs.cipher, Cipher_NULL) or
self.type == 0x14):
self.deciphered_len = None
return s
else:
msglen = struct.unpack('!H', s[3:5])[0]
hdr, efrag, r = s[:5], s[5:5 + msglen], s[msglen + 5:]
frag, auth_tag = self._tls_auth_decrypt(efrag)
self.deciphered_len = len(frag)
return hdr + frag + auth_tag + r
def post_dissect(self, s):
"""
Commit the pending read state if it has been triggered. We update
nothing if the prcs was not set, as this probably means that we're
working out-of-context (and we need to keep the default rcs).
"""
if self.tls_session.triggered_prcs_commit:
if self.tls_session.prcs is not None:
self.tls_session.rcs = self.tls_session.prcs
self.tls_session.prcs = None
self.tls_session.triggered_prcs_commit = False
return s
def do_dissect_payload(self, s):
"""
Try to dissect the following data as a TLS message.
Note that overloading .guess_payload_class() would not be enough,
as the TLS session to be used would get lost.
"""
return TLS.do_dissect_payload(self, s)
# Building methods
def _tls_auth_encrypt(self, s):
"""
Return the TLSCiphertext.encrypted_record for AEAD ciphers.
"""
wcs = self.tls_session.wcs
write_seq_num = struct.pack("!Q", wcs.seq_num)
wcs.seq_num += 1
add_data = (pkcs_i2osp(self.type, 1) +
pkcs_i2osp(self.version, 2) +
pkcs_i2osp(len(s) + wcs.cipher.tag_len, 2))
return wcs.cipher.auth_encrypt(s, add_data, write_seq_num)
def post_build(self, pkt, pay):
"""
Apply the previous methods according to the writing cipher type.
"""
# Compute the length of TLSPlaintext fragment
hdr, frag = pkt[:5], pkt[5:]
if not isinstance(self.tls_session.wcs.cipher, Cipher_NULL):
frag = self._tls_auth_encrypt(frag)
if self.len is not None:
# The user gave us a 'len', let's respect this ultimately
hdr = hdr[:3] + struct.pack("!H", self.len)
else:
# Update header with the length of TLSCiphertext.inner
hdr = hdr[:3] + struct.pack("!H", len(frag))
# Now we commit the pending write state if it has been triggered. We
# update nothing if the pwcs was not set. This probably means that
# we're working out-of-context (and we need to keep the default wcs).
if self.tls_session.triggered_pwcs_commit:
if self.tls_session.pwcs is not None:
self.tls_session.wcs = self.tls_session.pwcs
self.tls_session.pwcs = None
self.tls_session.triggered_pwcs_commit = False
return hdr + frag + pay
def mysummary(self):
s, n = super(TLS13, self).mysummary()
if self.inner and self.inner.msg:
s += " / "
s += " / ".join(getattr(x, "_name", x.name) for x in self.inner.msg)
return s, n