diff --git a/.gitignore b/.gitignore index 8c6fa8665..ead7952c0 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,7 @@ lib/ # Test executables bin/ + +# PyCharm +.idea +venv \ No newline at end of file diff --git a/floss/function_argument_getter.py b/floss/function_argument_getter.py index 6ccdd278a..2386557f7 100644 --- a/floss/function_argument_getter.py +++ b/floss/function_argument_getter.py @@ -51,12 +51,12 @@ def __init__(self, vivisect_workspace): self.driver = viv_utils.emulator_drivers.FunctionRunnerEmulatorDriver(self.emu) self.index = viv_utils.InstructionFunctionIndex(vivisect_workspace) - def get_all_function_contexts(self, function_va): + def get_all_function_contexts(self, function_va, max_hits): self.d("Getting function context for function at 0x%08X...", function_va) all_contexts = [] for caller_va in self.get_caller_vas(function_va): - function_context = self.get_contexts_via_monitor(caller_va, function_va) + function_context = self.get_contexts_via_monitor(caller_va, function_va, max_hits) all_contexts.extend(function_context) self.d("Got %d function contexts for function at 0x%08X.", len(all_contexts), function_va) @@ -92,7 +92,7 @@ def get_caller_vas(self, function_va): caller_function_vas.add(caller_function_va) return caller_function_vas - def get_contexts_via_monitor(self, fva, target_fva): + def get_contexts_via_monitor(self, fva, target_fva, max_hits): """ run the given function while collecting arguments to a target function """ @@ -106,7 +106,7 @@ def get_contexts_via_monitor(self, fva, target_fva): monitor = CallMonitor(self.vivisect_workspace, target_fva) with installed_monitor(self.driver, monitor): with api_hooks.defaultHooks(self.driver): - self.driver.runFunction(self.index[fva], maxhit=1, maxrep=0x1000, func_only=True) + self.driver.runFunction(self.index[fva], maxhit=max_hits, maxrep=0x1000, func_only=True) contexts = monitor.get_contexts() self.d(" results:") @@ -116,5 +116,5 @@ def get_contexts_via_monitor(self, fva, target_fva): return contexts -def get_function_contexts(vw, fva): - return FunctionArgumentGetter(vw).get_all_function_contexts(fva) +def get_function_contexts(vw, fva, max_hits): + return FunctionArgumentGetter(vw).get_all_function_contexts(fva, max_hits) diff --git a/floss/main.py b/floss/main.py index 5c52b5369..2d8bfa127 100644 --- a/floss/main.py +++ b/floss/main.py @@ -55,7 +55,7 @@ def hex(i): return "0x%X" % (i) -def decode_strings(vw, decoding_functions_candidates, min_length, no_filter=False, max_instruction_count=20000): +def decode_strings(vw, decoding_functions_candidates, min_length, no_filter=False, max_instruction_count=20000, max_hits=1): """ FLOSS string decoding algorithm :param vw: vivisect workspace @@ -63,13 +63,14 @@ def decode_strings(vw, decoding_functions_candidates, min_length, no_filter=Fals :param min_length: minimum string length :param no_filter: do not filter decoded strings :param max_instruction_count: The maximum number of instructions to emulate per function. + :param max_hits: The maximum number of hits per address :return: list of decoded strings ([DecodedString]) """ decoded_strings = [] function_index = viv_utils.InstructionFunctionIndex(vw) # TODO pass function list instead of identification manager for fva, _ in decoding_functions_candidates.get_top_candidate_functions(10): - for ctx in string_decoder.extract_decoding_contexts(vw, fva): + for ctx in string_decoder.extract_decoding_contexts(vw, fva, max_hits): for delta in string_decoder.emulate_decoding_routine(vw, function_index, fva, ctx, max_instruction_count): for delta_bytes in string_decoder.extract_delta_bytes(delta, ctx.decoded_at_va, fva): for decoded_string in string_decoder.extract_strings(delta_bytes, min_length, no_filter): @@ -146,7 +147,9 @@ def make_parser(): help="do not filter deobfuscated strings (may result in many false positive strings)", action="store_true") parser.add_option("--max-instruction-count", dest="max_instruction_count", type=int, default=20000, - help="maximum number of instructions to emulate per function") + help="maximum number of instructions to emulate per function (default is 20000)") + parser.add_option("--max-address-revisits", dest="max_address_revisits", type=int, default=0, + help="maximum number of address revisits per function (default is 0)") shellcode_group = OptionGroup(parser, "Shellcode options", "Analyze raw binary file containing shellcode") shellcode_group.add_option("-s", "--shellcode", dest="is_shellcode", help="analyze shellcode", @@ -420,7 +423,7 @@ def filter_unique_decoded(decoded_strings): unique_values = set() originals = [] for decoded in decoded_strings: - hashable = (decoded.va, decoded.s, decoded.decoded_at_va, decoded.fva) + hashable = (decoded.s, decoded.decoded_at_va, decoded.fva) if hashable not in unique_values: unique_values.add(hashable) originals.append(decoded) @@ -486,10 +489,10 @@ def print_decoding_results(decoded_strings, group_functions, quiet=False, expert :param quiet: print strings only, suppresses headers :param expert: expert mode """ - if not quiet: - print("\nFLOSS decoded %d strings" % len(decoded_strings)) if group_functions: + if not quiet: + print("\nFLOSS decoded %d strings" % len(decoded_strings)) fvas = set(map(lambda i: i.fva, decoded_strings)) for fva in fvas: grouped_strings = filter(lambda ds: ds.fva == fva, decoded_strings) @@ -499,6 +502,12 @@ def print_decoding_results(decoded_strings, group_functions, quiet=False, expert print("\nDecoding function at 0x%X (decoded %d strings)" % (fva, len_ds)) print_decoded_strings(grouped_strings, quiet=quiet, expert=expert) else: + if not expert: + seen = set() + decoded_strings = [x for x in decoded_strings if not (x.s in seen or seen.add(x.s))] + if not quiet: + print("\nFLOSS decoded %d strings" % len(decoded_strings)) + print_decoded_strings(decoded_strings, quiet=quiet, expert=expert) @@ -941,7 +950,10 @@ def main(argv=None): print_identification_results(sample_file_path, decoding_functions_candidates) floss_logger.info("Decoding strings...") - decoded_strings = decode_strings(vw, decoding_functions_candidates, min_length, options.no_filter, options.max_instruction_count) + decoded_strings = decode_strings(vw, decoding_functions_candidates, min_length, options.no_filter, + options.max_instruction_count, options.max_address_revisits + 1) + # TODO: The de-duplication process isn't perfect as it is done here and in print_decoding_results and + # TODO: all of them on non-sanitized strings. if not options.expert: decoded_strings = filter_unique_decoded(decoded_strings) print_decoding_results(decoded_strings, options.group_functions, quiet=options.quiet, expert=options.expert) diff --git a/floss/string_decoder.py b/floss/string_decoder.py index 8dda42cdd..cb5f750bd 100644 --- a/floss/string_decoder.py +++ b/floss/string_decoder.py @@ -91,7 +91,7 @@ def memdiff(bytes1, bytes2): return diffs -def extract_decoding_contexts(vw, function): +def extract_decoding_contexts(vw, function, max_hits): ''' Extract the CPU and memory contexts of all calls to the given function. Under the hood, we brute-force emulate all code paths to extract the @@ -101,9 +101,10 @@ def extract_decoding_contexts(vw, function): :param vw: The vivisect workspace in which the function is defined. :type function: int :param function: The address of the function whose contexts we'll find. + :param max_hits: The maximum number of hits per address :rtype: Sequence[function_argument_getter.FunctionContext] ''' - return get_function_contexts(vw, function) + return get_function_contexts(vw, function, max_hits) def emulate_decoding_routine(vw, function_index, function, context, max_instruction_count):