# The Representation of an Algorithm Impacts its Maliciousness

In [48]:
from collections import defaultdict
from dataclasses import dataclass
from itertools import combinations_with_replacement
import os
from pathlib import Path
from pprint import pprint
import shutil
import statistics
import subprocess
import typing as tp

In [43]:
ALGORITHM = ("heap", "merge", "quick")
ENCRYPTION = ("openssl", "cryptoppe", "tomcrypt")
PACKING = ("none", "upx", "exepacker")
INPUT_ROOT = Path(".")
OUTPUT_ROOT = Path("./outputs/concept")
ANACONDA = "C:\\Users\Luke\\anaconda3\\"

In [32]:
class InputHelper:
    
    def __init__(self, algorithm: str, encryption: str) -> None:
        self.algorithm = algorithm
        self.encryption = encryption

    @property
    def root(self) -> Path:
        return INPUT_ROOT
        
    @property
    def path(self) -> Path:
        return self.root / self.algorithm / self.encryption
    
    @property
    def source_file(self) -> Path:
        return self.path / "main.cpp"


class OutputHelper:
    
    def __init__(self, algorithm: str, encryption: str, packing: str) -> None:
        self.algorithm = algorithm
        self.encryption = encryption
        self.packing = packing
    
    def mkdir(self, exist_ok: bool = False, parents: bool = False) -> None:
        self.source_path.mkdir(exist_ok=exist_ok, parents=parents)
        self.binary_path.mkdir(exist_ok=exist_ok, parents=parents)
    
    @property
    def root(self) -> Path:
        return OUTPUT_ROOT
        
    @property
    def path(self) -> Path:
        return self.root / self.algorithm / self.encryption / self.packing

    @property
    def binaries_path(self) -> Path:
        return self.path / "binaries"
    
    @property
    def binaries(self) -> Path:
        return list(self.binary_path.iterdir())

## Create Diverse Executables

This needs to be run on my Windows Machine

In [37]:
def _get_compiler_flags() -> tp.List[tp.List[str]]:
    mixxy = [
        "-fno-inline",
        "-fno-function-cse",
        "-fno-guess-branch-probability",
        "-fno-tree-loop-distribute-patterns",
    ]
    flags = [tuple(set(i)) for i in combinations_with_replacement(mixxy, len(mixxy))]
    flags = list(list(i) for i in set(flags))
    flags += [["-O0"], ["-O2"], ["-O3"]]
    return flags


class Compiler:
    
    flag_options = _get_compiler_flags()
    
    def __init__(self, source: str, algorithm: str, encryption: str) -> None:
        self.source = Path(source)
        self.flags = [f"{algorithm}.cpp"]
        if encryption == "openssl":
            self.flags.extend([
                "opensslencryption.cpp",
                f"-I'{ANACONDA}include'",
                f"-L'{ANACONDA}libs'",
                "-lssl",
                "-lcrypto",
            ])
        elif encryption == "cryptoppe":
            pass
        elif encryption == "tomcrypt":
            pass

    def __call__(self, out: Path) -> tp.List[Path]:
        successes = []
        for i, var_flags in enumerate(self.flag_options):
            o = out / str(i)
            args = ["g++", self.source.as_posix()] + self.flags + ["-o", o.as_posix()] + var_flags
            result = subprocess.run(args, capture_output=True, text=True)
            stdout = str(result.stdout)
            stderr = str(result.stderr)
            if stderr:
                print(" ".join(args), f"\n{stderr}")
            else:
                successes.append(o)

In [None]:
for alg in ALGORITHM:
    for enc in ENCRYPTION:
        input_helper = InputHelper(alg, enc)
        compiler = Compiler(input_helper.source_file, alg, enc)
        for pck in PACKING:
            output_helper = OutputHelper(alg, enc, pck)
            output_helper.mkdir(True, True)
            compiler(output_helper.binaries_path)

# Apply Obfuscation

Henceforth, we switch over to a Linux OS

In [54]:
from captum.attr import FeatureAblation
import lief
import torch
from torch import tensor

os.chdir("/home/lk3591/Documents/code/MalConv2")

from classifier import confidence_scores, forward_function_malconv, get_dataset_and_loader, get_model
from executable_helper import read_binary, stream_text_section_bounds
from explain import AttributeParams, get_algorithm_kwargs, get_explanation_algorithm, BASELINE, TARGET
from utils import batch

In [None]:
# TODO

## Analyze Executables

In [55]:
def get_confidence_scores_from_files(model, files: tp.List[Path]) -> tp.List[float]:
	confidences = []
	for f in files:
		x = read_binary(f)
		x = tensor(x, dtype=torch.int64)
		c = confidence_scores(model, x)
		confidences.append(c[0])
	return confidences

@dataclass
class Result:
    alg: str
    enc: str
    pck: str
    confidences: tp.Iterable[float]
    malratio: tp.Iterable[float]
    conf_mean: float = None
    conf_median: float = None
    conf_max: float = None
    conf_min: float = None
    malratio_mean: float = None
    malratio_median: float = None
    malratio_max: float = None
    malratio_min: float = None
    
    def __post_init__(self):
        self.conf_mean = statistics.mean(self.confidences)
        self.conf_median = statistics.median(self.confidences)
        self.conf_max = max(self.confidences)
        self.conf_min = min(self.confidences)
        self.malratio_mean = statistics.mean(malratio)
        self.malratio_median = statistics.median(malratio)
        self.malratio_max = max(malratio)
        self.malratio_min = max(malratio)
        self.confidences = None
        self.malratio = None

In [None]:
model = get_model("gct")
forward_function = forward_function_malconv(model, False)
explainer = FeatureAblation(forward_function)
params = AttributeParams(BASELINE, ".text", 64, None, None, None, None, None, None, TARGET)
results = []
for alg in ALGORITHM:
    for enc in ENCRYPTION:
        for pck in PACKING:
            output_helper = OutputHelper(alg, enc, pck)
            dataset, loader = get_dataset_and_loader(
                None, output_helper.binaries, batch_size=batch_size
            )
            files = dataset.all_files
            
            confidences = get_confidence_scores_from_files(model, files)
            
            malratio = []
            bounds = batch(stream_text_section_bounds(files), batch_size, len(files))
            for (_, lowers, uppers), (inputs, _) in zip(bounds, loader):
                attribs = explainer.attribute(inputs, BASELINE. TARGET)
                for a, l, u in zip(attribs, lowers, uppers):
                    malratio.append((torch.sum(a[l:u]) / torch.sum(a)).item())
            
            result = Result(alg, enc, pck, confidences, malratio)
            results.append(result)

# Measure .text Section Maliciousness

## Swap .text Sections Confidence Scores

In [None]:
def swap_text_sections(f_source: Path, f_replace: Path) -> Path:
    source = lief.parse(f_source.as_posix())
    replace = lief.parse(f_replace.as_posix())
    source_text = source.get_section(".text")
    replace_text = replace.get_section(".text")
    source.remove(source_text)
    source.add_section(replace_text)
    
    builder = lief.PE.Builder(source)
    builder.build_imports(True)
    builder.build()
    f_out = f_source.parent / (f_source.stem + f_replace.stem + ".exe")
    builder.write(f_out.as_posix())

    return f_out

In [None]:
for f_source in executables:
    for f_replace in [f for f in executables if f != f_source]:
        f_out = swap_text_sections(f_source, f_replace)
        confidences[f_source.stem][f_replace.stem] = get_confidence_scores_from_files([f_out])[0]
        if WINDOWS:
            subprocess.run(f"./{f_out}")

In [None]:
pprint(confidences)
max_and_min_confidences(confidences)

# Main