Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tests/test_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
@pytest.mark.parametrize(
"chunks, expected, explanation",
[
(None, [], "None as chunks (No chunk found)"),
([], [], "Empty list as chunks (No chunk found)"),
(
[
Chunk(1, 2),
Expand Down
26 changes: 19 additions & 7 deletions unblob/strategies.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import io
import itertools
from operator import attrgetter
from operator import attrgetter, itemgetter
from pathlib import Path
from typing import Generator, List

Expand All @@ -16,7 +16,7 @@
logger = get_logger()


def search_chunks_by_priority(
def search_chunks_by_priority( # noqa: C901
path: Path, file: io.BufferedReader, file_size: int
) -> List[Chunk]:
all_chunks = []
Expand All @@ -31,7 +31,8 @@ def search_chunks_by_priority(
for result in yara_results:
handler = result.handler
match = result.match
for offset, identifier, string_data in match.strings:
sorted_matches = sorted(match.strings, key=itemgetter(0), reverse=True)
for offset, identifier, string_data in sorted_matches:
logger.info(
"Calculating chunk for YARA match",
start_offset=format_hex(offset),
Expand All @@ -54,17 +55,31 @@ def search_chunks_by_priority(
log.info("Found valid chunk")
all_chunks.append(chunk)

if chunk.size == file_size:
log.info("This Chunk represents the whole file")
return [chunk]

return all_chunks


def remove_inner_chunks(chunks: List[Chunk]):
"""Remove all chunks from the list which are within another bigger chunks."""
if not chunks:
return []

chunks_by_size = sorted(chunks, key=attrgetter("size"), reverse=True)
outer_chunks = [chunks_by_size[0]]
for chunk in chunks_by_size[1:]:
if not any(outer.contains(chunk) for outer in outer_chunks):
outer_chunks.append(chunk)
logger.info("Removed inner chunks", outer_chunk_count=len(outer_chunks))

outer_count = len(outer_chunks)
removed_count = len(chunks) - outer_count
logger.info(
"Removed inner chunks",
outer_chunk_count=outer_count,
removed_inner_chunk_count=removed_count,
)
return outer_chunks


Expand Down Expand Up @@ -119,9 +134,6 @@ def extract_with_priority(

with path.open("rb") as file:
all_chunks = search_chunks_by_priority(path, file, file_size)
if not all_chunks:
return

outer_chunks = remove_inner_chunks(all_chunks)
unknown_chunks = calculate_unknown_chunks(outer_chunks, file_size)
if unknown_chunks:
Expand Down