Skip to content

Commit

Permalink
Added basic support for fragmentation
Browse files Browse the repository at this point in the history
- Override Packet fragment() in TLSRecord(). Allows fragmenting
packet on defined offsets
  • Loading branch information
alexmgr committed Jul 20, 2015
1 parent 4d92a78 commit 6ad9dce
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
13 changes: 13 additions & 0 deletions scapy_ssl_tls/ssl_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,19 @@ def guess_payload_class(self, payload):
pass
return cls

def fragment(self, size=2**14):
payload = str(self.payload)
payloads = [payload[i: i+size] for i in range(0, len(self.payload), size)]
fragments = []
for payload in payloads:
fragments.append(TLSRecord(content_type=self.content_type, version=self.version, length=len(payload)) /
payload)
try:
stack = TLS.from_records(fragments, ctx=self.tls_ctx)
except struct.error as se:
raise ValueError("Fragment offset must be block aligned: %s" % se)
return stack

class TLSServerName(PacketNoPadding):
name = "TLS Servername"
fields_desc = [ByteEnumField("type", 0x00, {0x00:"host"}),
Expand Down
31 changes: 29 additions & 2 deletions tests/test_ssl_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,35 @@ def test_pkt_built_from_stacked_tls_handshakes_is_identical(self):
# check TLS layers one by one
self.assertEqual(re.findall(r'<(TLS[\w]+)',str(repr(self.stacked_handshake))), ['TLSRecord', 'TLSHandshake', 'TLSServerHello',
'TLSHandshake', 'TLSCertificateList', 'TLSCertificate',
'TLSHandshake', 'TLSServerHelloDone'])

'TLSHandshake', 'TLSServerHelloDone'])

def test_fragmentation_fails_on_non_aligned_boundary_for_handshakes(self):
pkt = tls.TLSRecord()/tls.TLSHandshake()/tls.TLSClientHello()
with self.assertRaises(ValueError):
pkt.fragment(7)
self.assertIsInstance(pkt.fragment(8), tls.TLS)

def test_fragmenting_a_record_returns_a_list_of_records_when_fragment_size_is_smaller_than_record(self):
frag_size = 3
app_data = "A"*7
pkt = tls.TLSRecord(version=tls.TLSVersion.TLS_1_1, content_type=tls.TLSContentType.APPLICATION_DATA)/app_data
fragments = pkt.fragment(frag_size)
self.assertEqual(len(fragments.records), len(app_data) / frag_size + len(app_data) % frag_size)
record_length = len(tls.TLSRecord())
self.assertTrue(all(list(map(lambda x: x.haslayer(tls.TLSRecord), fragments.records))))
self.assertEqual(len(fragments.records[0]), record_length + frag_size)
self.assertEqual(len(fragments.records[1]), record_length + frag_size)
self.assertEqual(len(fragments.records[2]), record_length + (len(app_data) % frag_size))

def test_fragmenting_a_record_does_nothing_when_fragment_size_is_larger_than_record(self):
app_data = "A"*7
frag_size = len(app_data)
pkt = tls.TLSRecord(version=tls.TLSVersion.TLS_1_1, content_type=tls.TLSContentType.APPLICATION_DATA)/app_data
self.assertEqual(str(pkt), str(pkt.fragment(frag_size)))
frag_size = len(app_data) * 2
self.assertEqual(str(pkt), str(pkt.fragment(frag_size)))


class TestTLSDissector(unittest.TestCase):

def setUp(self):
Expand Down

0 comments on commit 6ad9dce

Please sign in to comment.