diff --git a/ChangeLog b/ChangeLog index e3f106b9..cbc75c1a 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,4 +1,4 @@ -fwknop-2.6.11 (12/09/2019): +fwknop-2.6.11 (02//2024): - [server] (Amin Massad) Fixed two bugs in PF handling code - one for indefinitely repeating error messages "Did not find expire comment in rules list 0" in rule deletion code, and the second where min_exp was @@ -18,6 +18,7 @@ fwknop-2.6.11 (12/09/2019): Ubuntu 22.04 for example. - [test suite] Prefer the 'ip' command over the older 'ifconfig' command for interface operations and loopback detection. + - [test suite] Update the 'spa_fuzzing.py' fuzzer to use Python3. fwknop-2.6.10 (08/06/2018): - [server] Add MAX_FW_TIMEOUT to access.conf stanzas to allow a maximum diff --git a/test/spa_fuzzing.py b/test/spa_fuzzing.py index e4f7204f..e5d02307 100755 --- a/test/spa_fuzzing.py +++ b/test/spa_fuzzing.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # # Purpose: This script generates SPA packet payloads that are designed to # act as a fuzzer against libfko SPA decoding routines. @@ -16,6 +16,7 @@ # 1716411011200157:cm9vdA:1397329899:2.0.1:1:MTI3LjAuMC4yLHRjcC8yMw # +import itertools import base64 import argparse @@ -32,6 +33,7 @@ def main(): print_hdr() + ### these are the base payloads that are mutated by the fuzzer spa_payloads = [ # type 1: normal access request @@ -81,8 +83,8 @@ def main(): payload_num += 1 - print "# start tests with payload: ", spa_payload + "\n" \ - "# base64 encoded original payload:", base64.b64encode(spa_payload) + print("# start tests with payload: %s" % spa_payload) + print("# base64 encoded original payload: %s" % to_b64e_str(spa_payload)) ### valid payload tests - all digest types pkt_id = valid_payloads(args, spa_payload, payload_num, pkt_id) @@ -112,7 +114,7 @@ def main(): def field_fuzzing(args, spa_payload, payload_num, pkt_id): - print "# payload " + str(payload_num) + " (" + spa_payload + ") field fuzzing..." + print("# payload %s (%s) field fuzzing..." % (payload_num, spa_payload)) repl_start = 0 repl_end = 0 @@ -139,34 +141,34 @@ def field_fuzzing(args, spa_payload, payload_num, pkt_id): ### field and permute it in various ways ### truncation - for l in range(1, len(decoded)): - pkt_id = write_fuzzing_payload(field_num, decoded[:l], \ + for length in range(1, len(decoded)): + pkt_id = write_fuzzing_payload(field_num, decoded[:length], \ orig_field, repl_start, repl_end, spa_payload, \ pkt_id, idx) - pkt_id = write_fuzzing_payload(field_num, decoded[l:], \ + pkt_id = write_fuzzing_payload(field_num, decoded[length:], \ orig_field, repl_start, repl_end, spa_payload, \ pkt_id, idx) ### remove chunks for bl in range(1, len(decoded)): - for l in range(0, bl): - fuzzing_field = decoded[:l] + decoded[l+bl:] + for length in range(0, bl): + fuzzing_field = decoded[:length] + decoded[length+bl:] pkt_id = write_fuzzing_payload(field_num, fuzzing_field, \ orig_field, repl_start, repl_end, spa_payload, \ pkt_id, idx) ### append/prepend data - for l in [1, 10, 50, 127, 128, 129, 200, 399, \ + for length in [1, 10, 50, 127, 128, 129, 200, 399, \ 400, 401, 500, 800, 1000, 1023, 1024, 1025, \ 1200, 1499, 1500, 1501, 2000]: - for non_ascii in range(0, 5) + range(127, 130) + range(252, 256): + for non_ascii in itertools.chain(range(0, 5), range(127, 130), range(252, 256)): new_data = '' - for p in range(0, l): + for p in range(0, length): new_data += chr(non_ascii) - pkt_id = write_fuzzing_payload(field_num, decoded + new_data, \ + pkt_id = write_fuzzing_payload(field_num, str(decoded) + str(new_data), \ orig_field, repl_start, repl_end, spa_payload, \ pkt_id, idx) - pkt_id = write_fuzzing_payload(field_num, new_data + decoded, \ + pkt_id = write_fuzzing_payload(field_num, str(new_data) + str(decoded), \ orig_field, repl_start, repl_end, spa_payload, \ pkt_id, idx) @@ -180,7 +182,7 @@ def field_fuzzing(args, spa_payload, payload_num, pkt_id): ### embedded chars for pos in range(0, len(decoded)): - for c in range(0, 31) + range(44, 48) + range(127, 131) + range(253, 255): + for c in itertools.chain(range(0, 31), range(44, 48), range(127, 131), range(253, 255)): fuzzing_field = list(decoded) fuzzing_field[pos] = chr(c) pkt_id = write_fuzzing_payload(field_num, str(fuzzing_field), \ @@ -188,12 +190,12 @@ def field_fuzzing(args, spa_payload, payload_num, pkt_id): pkt_id, idx) ### now generate fuzzing data for this field - for c in range(0, 3) + range(33, 47) + range(65, 67) + range(127, 130) + range(252, 256): - for l in [1, 2, 3, 4, 5, 6, 10, 14, 15, 16, 17, 24, 31, 32, 33, \ + for c in itertools.chain(range(0, 3), range(33, 47), range(65, 67), range(127, 130), range(252, 256)): + for length in [1, 2, 3, 4, 5, 6, 10, 14, 15, 16, 17, 24, 31, 32, 33, \ 63, 64, 127, 128, 129, 150, 220, 230, 254, 255, 256, 257, 258]: fuzzing_field = '' - for n in range(0, l): + for n in range(0, length): fuzzing_field += chr(c) pkt_id = write_fuzzing_payload(field_num, fuzzing_field, \ orig_field, repl_start, repl_end, spa_payload, \ @@ -222,8 +224,8 @@ def write_fuzzing_payload(field_num, fuzzing_field, orig_field, \ new_payloads[i] += spa_payload[repl_end:] for s in new_payloads: - print str(pkt_id), str(spa_failure), str(do_digest), \ - str(spa_sha256), base64.b64encode(s) + print("%s %s %s %s %s" % (pkt_id, spa_failure, do_digest, \ + spa_sha256, to_b64e_str(s))) pkt_id += 1 return pkt_id @@ -246,12 +248,12 @@ def field_variants(new_payloads, fuzzing_field, orig_field, require_b64): if require_b64: decoded_orig_field = spa_base64_decode(orig_field) new_payloads[0] += spa_base64_encode(fuzzing_field) - new_payloads[1] += spa_base64_encode(fuzzing_field+decoded_orig_field) - new_payloads[2] += spa_base64_encode(decoded_orig_field+fuzzing_field) + new_payloads[1] += spa_base64_encode(str(fuzzing_field) + str(decoded_orig_field)) + new_payloads[2] += spa_base64_encode(str(decoded_orig_field) + str(fuzzing_field)) else: new_payloads[0] += fuzzing_field - new_payloads[1] += fuzzing_field+orig_field - new_payloads[2] += orig_field+fuzzing_field + new_payloads[1] += str(fuzzing_field) + str(orig_field) + new_payloads[2] += str(orig_field) +str(fuzzing_field) return def spa_base64_decode(b64str): @@ -264,93 +266,97 @@ def spa_base64_decode(b64str): def spa_base64_encode(nonb64str): ### strip '=' chars like fwknop does - return base64.b64encode(nonb64str).replace('=', '') + return to_b64e_str(nonb64str).replace('=', '') + +def to_b64e_str(s): + b = base64.b64encode(bytes(str(s), 'utf-8')) + return b.decode('utf-8') def valid_payloads(args, spa_payload, payload_num, pkt_id): - print "# payload " + str(payload_num) + " (" + spa_payload + ") valid payload + valid digest types..." + print("# payload %s (%s) valid payload + valid digest types..." % (payload_num, spa_payload)) for digest_type in range(0, 6): - print str(pkt_id), str(spa_success), str(do_digest), \ - str(digest_type), base64.b64encode(spa_payload) + print("%s %s %s %s %s" % (pkt_id, spa_success, do_digest, \ + digest_type, to_b64e_str(spa_payload))) pkt_id += 1 return pkt_id def invalid_digest_types(args, spa_payload, payload_num, pkt_id): - print "# payload " + str(payload_num) + " (" + spa_payload + ") invalid digest types..." + print("# payload %s (%s) invalid digest types..." % (payload_num, spa_payload)) for digest_type in [-1, 6, 7]: - print str(pkt_id), str(spa_success), str(do_digest), \ - str(digest_type), base64.b64encode(spa_payload) + print("%s %s %s %s %s" % (pkt_id, spa_success, do_digest, \ + digest_type, to_b64e_str(spa_payload))) pkt_id += 1 return pkt_id def truncated_lengths(args, spa_payload, payload_num, pkt_id): - print "# payload " + str(payload_num) + " (" + spa_payload + ") truncated lengths..." - for l in range(1, len(spa_payload)): - print str(pkt_id), str(spa_failure), str(do_digest), \ - str(spa_sha256), base64.b64encode(spa_payload[:l]) + print("# payload %s (%s) truncated lengths..." % (payload_num, spa_payload)) + for length in range(1, len(spa_payload)): + print("%s %s %s %s %s" % (pkt_id, spa_failure, do_digest, \ + spa_sha256, to_b64e_str(spa_payload[:length]))) pkt_id += 1 - for l in range(1, len(spa_payload)): - print str(pkt_id), str(spa_failure), str(do_digest), \ - str(spa_sha256), base64.b64encode(spa_payload[l:]) + for length in range(1, len(spa_payload)): + print("%s %s %s %s %s" % (pkt_id, spa_failure, do_digest, \ + spa_sha256, to_b64e_str(spa_payload[length:]))) pkt_id += 1 return pkt_id def rm_chunks(args, spa_payload, payload_num, pkt_id): - print "# payload " + str(payload_num) + " (" + spa_payload + ") splice blocks of chars..." + print("# payload %s (%s) splice blocks of chars..." % (payload_num, spa_payload)) for bl in range(1, 20): - for l in range(0, len(spa_payload)): - new_payload = spa_payload[:l] + spa_payload[l+bl:] - print str(pkt_id), str(spa_failure), str(do_digest), \ - str(spa_sha256), base64.b64encode(new_payload) + for length in range(0, len(spa_payload)): + new_payload = spa_payload[:length] + spa_payload[length+bl:] + print("%s %s %s %s %s" % (pkt_id, spa_failure, do_digest, \ + spa_sha256, to_b64e_str(new_payload))) pkt_id += 1 return pkt_id def data_extensions(args, spa_payload, payload_num, pkt_id): - print "# payload " + str(payload_num) + " (" + spa_payload + ") payloads too long..." - for l in [1, 10, 50, 127, 128, 129, 200, 399, \ + print("# payload %s (%s) payloads too long..." % (payload_num, spa_payload)) + for length in [1, 10, 50, 127, 128, 129, 200, 399, \ 400, 401, 500, 800, 1000, 1023, 1024, 1025, \ 1200, 1499, 1500, 1501, 2000]: - for non_ascii in range(0, 5) + range(127, 130) + range(252, 256): + for non_ascii in itertools.chain(range(0, 5), range(127, 130), range(252, 256)): new_data = '' - for p in range(0, l): + for p in range(0, length): new_data += chr(non_ascii) ### append - print str(pkt_id), str(spa_failure), str(do_digest), \ - str(spa_sha256), base64.b64encode(spa_payload + new_data) + print("%s %s %s %s %s" % (pkt_id, spa_failure, do_digest, \ + spa_sha256, to_b64e_str(spa_payload + new_data))) pkt_id += 1 ### prepend - print str(pkt_id), str(spa_failure), str(do_digest), \ - str(spa_sha256), base64.b64encode(new_data + spa_payload) + print("%s %s %s %s %s" % (pkt_id, spa_failure, do_digest, \ + spa_sha256, to_b64e_str(new_data + spa_payload))) pkt_id += 1 return pkt_id def embedded_separators(args, spa_payload, payload_num, pkt_id): - print "# payload " + str(payload_num) + " (" + spa_payload + ") additional embedded : chars..." + print("# payload %s (%s) additional embedded ':' chars..." % (payload_num, spa_payload)) for pos in range(0, len(spa_payload)): if spa_payload[pos] == ':': continue new_payload = list(spa_payload) new_payload[pos] = ':' - print str(pkt_id), str(spa_failure), str(do_digest), \ - str(spa_sha256), base64.b64encode(''.join(new_payload)) + print("%s %s %s %s %s" % (pkt_id, spa_failure, do_digest, \ + spa_sha256, to_b64e_str(''.join(new_payload)))) pkt_id += 1 return pkt_id def embedded_chars(args, spa_payload, payload_num, pkt_id): - print "# payload " + str(payload_num) + " (" + spa_payload + ") non-ascii char tests..." + print("# payload %s (%s) non-ascii char tests..." % (payload_num, spa_payload)) for pos in range(0, len(spa_payload)): - for c in range(0, 31) + range(44, 48) + range(127, 131) + range(253, 255): + for c in itertools.chain(range(0, 31), range(44, 48), range(127, 131), range(253, 255)): new_payload = list(spa_payload) new_payload[pos] = chr(c) ### write out the fuzzing line - print str(pkt_id), str(spa_failure), str(do_digest), \ - str(spa_sha256), base64.b64encode(''.join(new_payload)) + print("%s %s %s %s %s" % (pkt_id, spa_failure, do_digest, \ + spa_sha256, to_b64e_str(''.join(new_payload)))) pkt_id += 1 return pkt_id def print_hdr(): - print "#\n# This file was generated by the fwknop SPA packet fuzzer test/spa_fuzzing.py...\n#\n" \ - "# \n#\n" + print("#\n# This file was generated by the fwknop SPA packet fuzzer test/spa_fuzzing.py...\n#\n" \ + "# \n#\n") return def parse_cmdline():