Skip to content

Commit

Permalink
[test suite] update test/spa_fuzzing.py to use Python3
Browse files Browse the repository at this point in the history
  • Loading branch information
mrash committed Feb 4, 2024
1 parent 553a20c commit 0bba17c
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 61 deletions.
3 changes: 2 additions & 1 deletion ChangeLog
@@ -1,4 +1,4 @@
fwknop-2.6.11 (12/09/2019):
fwknop-2.6.11 (02//2024):
- [server] (Amin Massad) Fixed two bugs in PF handling code - one for
indefinitely repeating error messages "Did not find expire comment in
rules list 0" in rule deletion code, and the second where min_exp was
Expand All @@ -18,6 +18,7 @@ fwknop-2.6.11 (12/09/2019):
Ubuntu 22.04 for example.
- [test suite] Prefer the 'ip' command over the older 'ifconfig' command
for interface operations and loopback detection.
- [test suite] Update the 'spa_fuzzing.py' fuzzer to use Python3.

fwknop-2.6.10 (08/06/2018):
- [server] Add MAX_FW_TIMEOUT to access.conf stanzas to allow a maximum
Expand Down
126 changes: 66 additions & 60 deletions test/spa_fuzzing.py
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
#
# Purpose: This script generates SPA packet payloads that are designed to
# act as a fuzzer against libfko SPA decoding routines.
Expand All @@ -16,6 +16,7 @@
# 1716411011200157:cm9vdA:1397329899:2.0.1:1:MTI3LjAuMC4yLHRjcC8yMw
#

import itertools
import base64
import argparse

Expand All @@ -32,6 +33,7 @@ def main():

print_hdr()

### these are the base payloads that are mutated by the fuzzer
spa_payloads = [

# type 1: normal access request
Expand Down Expand Up @@ -81,8 +83,8 @@ def main():

payload_num += 1

print "# start tests with payload: ", spa_payload + "\n" \
"# base64 encoded original payload:", base64.b64encode(spa_payload)
print("# start tests with payload: %s" % spa_payload)
print("# base64 encoded original payload: %s" % to_b64e_str(spa_payload))

### valid payload tests - all digest types
pkt_id = valid_payloads(args, spa_payload, payload_num, pkt_id)
Expand Down Expand Up @@ -112,7 +114,7 @@ def main():

def field_fuzzing(args, spa_payload, payload_num, pkt_id):

print "# payload " + str(payload_num) + " (" + spa_payload + ") field fuzzing..."
print("# payload %s (%s) field fuzzing..." % (payload_num, spa_payload))

repl_start = 0
repl_end = 0
Expand All @@ -139,34 +141,34 @@ def field_fuzzing(args, spa_payload, payload_num, pkt_id):
### field and permute it in various ways

### truncation
for l in range(1, len(decoded)):
pkt_id = write_fuzzing_payload(field_num, decoded[:l], \
for length in range(1, len(decoded)):
pkt_id = write_fuzzing_payload(field_num, decoded[:length], \
orig_field, repl_start, repl_end, spa_payload, \
pkt_id, idx)
pkt_id = write_fuzzing_payload(field_num, decoded[l:], \
pkt_id = write_fuzzing_payload(field_num, decoded[length:], \
orig_field, repl_start, repl_end, spa_payload, \
pkt_id, idx)

### remove chunks
for bl in range(1, len(decoded)):
for l in range(0, bl):
fuzzing_field = decoded[:l] + decoded[l+bl:]
for length in range(0, bl):
fuzzing_field = decoded[:length] + decoded[length+bl:]
pkt_id = write_fuzzing_payload(field_num, fuzzing_field, \
orig_field, repl_start, repl_end, spa_payload, \
pkt_id, idx)

### append/prepend data
for l in [1, 10, 50, 127, 128, 129, 200, 399, \
for length in [1, 10, 50, 127, 128, 129, 200, 399, \
400, 401, 500, 800, 1000, 1023, 1024, 1025, \
1200, 1499, 1500, 1501, 2000]:
for non_ascii in range(0, 5) + range(127, 130) + range(252, 256):
for non_ascii in itertools.chain(range(0, 5), range(127, 130), range(252, 256)):
new_data = ''
for p in range(0, l):
for p in range(0, length):
new_data += chr(non_ascii)
pkt_id = write_fuzzing_payload(field_num, decoded + new_data, \
pkt_id = write_fuzzing_payload(field_num, str(decoded) + str(new_data), \
orig_field, repl_start, repl_end, spa_payload, \
pkt_id, idx)
pkt_id = write_fuzzing_payload(field_num, new_data + decoded, \
pkt_id = write_fuzzing_payload(field_num, str(new_data) + str(decoded), \
orig_field, repl_start, repl_end, spa_payload, \
pkt_id, idx)

Expand All @@ -180,20 +182,20 @@ def field_fuzzing(args, spa_payload, payload_num, pkt_id):

### embedded chars
for pos in range(0, len(decoded)):
for c in range(0, 31) + range(44, 48) + range(127, 131) + range(253, 255):
for c in itertools.chain(range(0, 31), range(44, 48), range(127, 131), range(253, 255)):
fuzzing_field = list(decoded)
fuzzing_field[pos] = chr(c)
pkt_id = write_fuzzing_payload(field_num, str(fuzzing_field), \
orig_field, repl_start, repl_end, spa_payload, \
pkt_id, idx)

### now generate fuzzing data for this field
for c in range(0, 3) + range(33, 47) + range(65, 67) + range(127, 130) + range(252, 256):
for l in [1, 2, 3, 4, 5, 6, 10, 14, 15, 16, 17, 24, 31, 32, 33, \
for c in itertools.chain(range(0, 3), range(33, 47), range(65, 67), range(127, 130), range(252, 256)):
for length in [1, 2, 3, 4, 5, 6, 10, 14, 15, 16, 17, 24, 31, 32, 33, \
63, 64, 127, 128, 129, 150, 220, 230, 254, 255, 256, 257, 258]:

fuzzing_field = ''
for n in range(0, l):
for n in range(0, length):
fuzzing_field += chr(c)
pkt_id = write_fuzzing_payload(field_num, fuzzing_field, \
orig_field, repl_start, repl_end, spa_payload, \
Expand Down Expand Up @@ -222,8 +224,8 @@ def write_fuzzing_payload(field_num, fuzzing_field, orig_field, \
new_payloads[i] += spa_payload[repl_end:]

for s in new_payloads:
print str(pkt_id), str(spa_failure), str(do_digest), \
str(spa_sha256), base64.b64encode(s)
print("%s %s %s %s %s" % (pkt_id, spa_failure, do_digest, \
spa_sha256, to_b64e_str(s)))
pkt_id += 1

return pkt_id
Expand All @@ -246,12 +248,12 @@ def field_variants(new_payloads, fuzzing_field, orig_field, require_b64):
if require_b64:
decoded_orig_field = spa_base64_decode(orig_field)
new_payloads[0] += spa_base64_encode(fuzzing_field)
new_payloads[1] += spa_base64_encode(fuzzing_field+decoded_orig_field)
new_payloads[2] += spa_base64_encode(decoded_orig_field+fuzzing_field)
new_payloads[1] += spa_base64_encode(str(fuzzing_field) + str(decoded_orig_field))
new_payloads[2] += spa_base64_encode(str(decoded_orig_field) + str(fuzzing_field))
else:
new_payloads[0] += fuzzing_field
new_payloads[1] += fuzzing_field+orig_field
new_payloads[2] += orig_field+fuzzing_field
new_payloads[1] += str(fuzzing_field) + str(orig_field)
new_payloads[2] += str(orig_field) +str(fuzzing_field)
return

def spa_base64_decode(b64str):
Expand All @@ -264,93 +266,97 @@ def spa_base64_decode(b64str):

def spa_base64_encode(nonb64str):
### strip '=' chars like fwknop does
return base64.b64encode(nonb64str).replace('=', '')
return to_b64e_str(nonb64str).replace('=', '')

def to_b64e_str(s):
b = base64.b64encode(bytes(str(s), 'utf-8'))
return b.decode('utf-8')

def valid_payloads(args, spa_payload, payload_num, pkt_id):
print "# payload " + str(payload_num) + " (" + spa_payload + ") valid payload + valid digest types..."
print("# payload %s (%s) valid payload + valid digest types..." % (payload_num, spa_payload))
for digest_type in range(0, 6):
print str(pkt_id), str(spa_success), str(do_digest), \
str(digest_type), base64.b64encode(spa_payload)
print("%s %s %s %s %s" % (pkt_id, spa_success, do_digest, \
digest_type, to_b64e_str(spa_payload)))
pkt_id += 1
return pkt_id

def invalid_digest_types(args, spa_payload, payload_num, pkt_id):
print "# payload " + str(payload_num) + " (" + spa_payload + ") invalid digest types..."
print("# payload %s (%s) invalid digest types..." % (payload_num, spa_payload))
for digest_type in [-1, 6, 7]:
print str(pkt_id), str(spa_success), str(do_digest), \
str(digest_type), base64.b64encode(spa_payload)
print("%s %s %s %s %s" % (pkt_id, spa_success, do_digest, \
digest_type, to_b64e_str(spa_payload)))
pkt_id += 1
return pkt_id

def truncated_lengths(args, spa_payload, payload_num, pkt_id):
print "# payload " + str(payload_num) + " (" + spa_payload + ") truncated lengths..."
for l in range(1, len(spa_payload)):
print str(pkt_id), str(spa_failure), str(do_digest), \
str(spa_sha256), base64.b64encode(spa_payload[:l])
print("# payload %s (%s) truncated lengths..." % (payload_num, spa_payload))
for length in range(1, len(spa_payload)):
print("%s %s %s %s %s" % (pkt_id, spa_failure, do_digest, \
spa_sha256, to_b64e_str(spa_payload[:length])))
pkt_id += 1
for l in range(1, len(spa_payload)):
print str(pkt_id), str(spa_failure), str(do_digest), \
str(spa_sha256), base64.b64encode(spa_payload[l:])
for length in range(1, len(spa_payload)):
print("%s %s %s %s %s" % (pkt_id, spa_failure, do_digest, \
spa_sha256, to_b64e_str(spa_payload[length:])))
pkt_id += 1

return pkt_id

def rm_chunks(args, spa_payload, payload_num, pkt_id):
print "# payload " + str(payload_num) + " (" + spa_payload + ") splice blocks of chars..."
print("# payload %s (%s) splice blocks of chars..." % (payload_num, spa_payload))
for bl in range(1, 20):
for l in range(0, len(spa_payload)):
new_payload = spa_payload[:l] + spa_payload[l+bl:]
print str(pkt_id), str(spa_failure), str(do_digest), \
str(spa_sha256), base64.b64encode(new_payload)
for length in range(0, len(spa_payload)):
new_payload = spa_payload[:length] + spa_payload[length+bl:]
print("%s %s %s %s %s" % (pkt_id, spa_failure, do_digest, \
spa_sha256, to_b64e_str(new_payload)))
pkt_id += 1
return pkt_id

def data_extensions(args, spa_payload, payload_num, pkt_id):
print "# payload " + str(payload_num) + " (" + spa_payload + ") payloads too long..."
for l in [1, 10, 50, 127, 128, 129, 200, 399, \
print("# payload %s (%s) payloads too long..." % (payload_num, spa_payload))
for length in [1, 10, 50, 127, 128, 129, 200, 399, \
400, 401, 500, 800, 1000, 1023, 1024, 1025, \
1200, 1499, 1500, 1501, 2000]:
for non_ascii in range(0, 5) + range(127, 130) + range(252, 256):
for non_ascii in itertools.chain(range(0, 5), range(127, 130), range(252, 256)):
new_data = ''
for p in range(0, l):
for p in range(0, length):
new_data += chr(non_ascii)
### append
print str(pkt_id), str(spa_failure), str(do_digest), \
str(spa_sha256), base64.b64encode(spa_payload + new_data)
print("%s %s %s %s %s" % (pkt_id, spa_failure, do_digest, \
spa_sha256, to_b64e_str(spa_payload + new_data)))
pkt_id += 1
### prepend
print str(pkt_id), str(spa_failure), str(do_digest), \
str(spa_sha256), base64.b64encode(new_data + spa_payload)
print("%s %s %s %s %s" % (pkt_id, spa_failure, do_digest, \
spa_sha256, to_b64e_str(new_data + spa_payload)))
pkt_id += 1
return pkt_id

def embedded_separators(args, spa_payload, payload_num, pkt_id):
print "# payload " + str(payload_num) + " (" + spa_payload + ") additional embedded : chars..."
print("# payload %s (%s) additional embedded ':' chars..." % (payload_num, spa_payload))
for pos in range(0, len(spa_payload)):
if spa_payload[pos] == ':':
continue
new_payload = list(spa_payload)
new_payload[pos] = ':'
print str(pkt_id), str(spa_failure), str(do_digest), \
str(spa_sha256), base64.b64encode(''.join(new_payload))
print("%s %s %s %s %s" % (pkt_id, spa_failure, do_digest, \
spa_sha256, to_b64e_str(''.join(new_payload))))
pkt_id += 1
return pkt_id

def embedded_chars(args, spa_payload, payload_num, pkt_id):
print "# payload " + str(payload_num) + " (" + spa_payload + ") non-ascii char tests..."
print("# payload %s (%s) non-ascii char tests..." % (payload_num, spa_payload))
for pos in range(0, len(spa_payload)):
for c in range(0, 31) + range(44, 48) + range(127, 131) + range(253, 255):
for c in itertools.chain(range(0, 31), range(44, 48), range(127, 131), range(253, 255)):
new_payload = list(spa_payload)
new_payload[pos] = chr(c)
### write out the fuzzing line
print str(pkt_id), str(spa_failure), str(do_digest), \
str(spa_sha256), base64.b64encode(''.join(new_payload))
print("%s %s %s %s %s" % (pkt_id, spa_failure, do_digest, \
spa_sha256, to_b64e_str(''.join(new_payload))))
pkt_id += 1
return pkt_id

def print_hdr():
print "#\n# This file was generated by the fwknop SPA packet fuzzer test/spa_fuzzing.py...\n#\n" \
"# <pkt_ID> <status: success|fail> <digest: yes|no> <digest type> <base64_SPA_payload>\n#\n"
print("#\n# This file was generated by the fwknop SPA packet fuzzer test/spa_fuzzing.py...\n#\n" \
"# <pkt_ID> <status: success|fail> <digest: yes|no> <digest type> <base64_SPA_payload>\n#\n")
return

def parse_cmdline():
Expand Down

0 comments on commit 0bba17c

Please sign in to comment.