diff --git a/tests/test_strategies.py b/tests/test_strategies.py index f4b12bf216..b98345a537 100644 --- a/tests/test_strategies.py +++ b/tests/test_strategies.py @@ -7,6 +7,8 @@ @pytest.mark.parametrize( "chunks, expected, explanation", [ + (None, [], "None as chunks (No chunk found)"), + ([], [], "Empty list as chunks (No chunk found)"), ( [ Chunk(1, 2), diff --git a/unblob/strategies.py b/unblob/strategies.py index 61d98f74a8..3bd5de0af6 100644 --- a/unblob/strategies.py +++ b/unblob/strategies.py @@ -1,6 +1,6 @@ import io import itertools -from operator import attrgetter +from operator import attrgetter, itemgetter from pathlib import Path from typing import Generator, List @@ -16,7 +16,7 @@ logger = get_logger() -def search_chunks_by_priority( +def search_chunks_by_priority( # noqa: C901 path: Path, file: io.BufferedReader, file_size: int ) -> List[Chunk]: all_chunks = [] @@ -31,7 +31,8 @@ def search_chunks_by_priority( for result in yara_results: handler = result.handler match = result.match - for offset, identifier, string_data in match.strings: + sorted_matches = sorted(match.strings, key=itemgetter(0), reverse=True) + for offset, identifier, string_data in sorted_matches: logger.info( "Calculating chunk for YARA match", start_offset=format_hex(offset), @@ -54,17 +55,31 @@ def search_chunks_by_priority( log.info("Found valid chunk") all_chunks.append(chunk) + if chunk.size == file_size: + log.info("This Chunk represents the whole file") + return [chunk] + return all_chunks def remove_inner_chunks(chunks: List[Chunk]): """Remove all chunks from the list which are within another bigger chunks.""" + if not chunks: + return [] + chunks_by_size = sorted(chunks, key=attrgetter("size"), reverse=True) outer_chunks = [chunks_by_size[0]] for chunk in chunks_by_size[1:]: if not any(outer.contains(chunk) for outer in outer_chunks): outer_chunks.append(chunk) - logger.info("Removed inner chunks", outer_chunk_count=len(outer_chunks)) + + outer_count = len(outer_chunks) + removed_count = len(chunks) - outer_count + logger.info( + "Removed inner chunks", + outer_chunk_count=outer_count, + removed_inner_chunk_count=removed_count, + ) return outer_chunks @@ -119,9 +134,6 @@ def extract_with_priority( with path.open("rb") as file: all_chunks = search_chunks_by_priority(path, file, file_size) - if not all_chunks: - return - outer_chunks = remove_inner_chunks(all_chunks) unknown_chunks = calculate_unknown_chunks(outer_chunks, file_size) if unknown_chunks: