In [None]:
import rich.pretty

rich.pretty.install()

# Populate the database (kim2023revisiting)

In [None]:
from evaluatie import models as m

In [None]:
import pathlib as pl
import pickle
import hashlib
import attrs

In [None]:
import sqlalchemy as sa

In [None]:
session = m.Session()

In [None]:
@attrs.frozen
class BinaryPathParseResult:
    package_name: str
    package_version: str
    executable_name: str
    compiler_backend: str
    compiler_version: str
    optimisation: str
    architecture: str
    bitness: int
    lto: bool
    noinline: bool
    pie: bool


def parse_binary_path(binary_path: pl.Path) -> BinaryPathParseResult:
    # binary_path.name looks like 'binutils-2.30_gcc-8.2.0_x86_64_O0_nm.elf'

    dataset_dir = binary_path.parts[-3]
    if dataset_dir == "gnu_debug":
        dataset = "normal"
    else:
        dataset = dataset_dir.removeprefix("gnu_debug_")

    name = binary_path.stem
    split = name.split("_")
    package = split[0]
    package_split = package.split("-")
    package_version = package_split[-1]
    package_name = "-".join(package_split[:-1])
    compiler = split[1]
    compiler_backend, compiler_version = compiler.split("-")
    executable_name = "_".join(split[5:])
    split = split[:5]

    return BinaryPathParseResult(
        package_name=package_name,
        package_version=package_version,
        executable_name=executable_name,
        compiler_backend=compiler_backend,
        compiler_version=compiler_version,
        optimisation=split[4],
        architecture=split[2],
        bitness=int(split[3]),
        lto=(dataset == "gnu_debug_lto"),
        noinline=(dataset == "gnu_debug_noinline"),
        pie=(dataset == "gnu_debug_pie"),
    )


def build_parameters_to_dict(build_parameters: m.BuildParameters):
    return {
        "compiler_backend": build_parameters.compiler_backend,
        "compiler_version": build_parameters.compiler_version,
        "optimisation": build_parameters.optimisation,
        "architecture": build_parameters.architecture,
        "bitness": build_parameters.bitness,
        "lto": build_parameters.lto,
        "noinline": build_parameters.noinline,
        "pie": build_parameters.pie,
    }


stmt = sa.select(
    m.BuildParameters,
)
existing_build_parameters = list(session.scalars(stmt))
build_parameters_cache = {
    hash(frozenset(build_parameters_to_dict(bp).items())): bp for bp in existing_build_parameters
}


def build_parameters_from_binary_path_parse_result(parse_result: BinaryPathParseResult):
    kwargs = {
        "compiler_backend": parse_result.compiler_backend,
        "compiler_version": parse_result.compiler_version,
        "optimisation": parse_result.optimisation,
        "architecture": parse_result.architecture,
        "bitness": parse_result.bitness,
        "lto": parse_result.lto,
        "noinline": parse_result.noinline,
        "pie": parse_result.pie,
    }
    key = hash(frozenset(kwargs.items()))

    return build_parameters_cache.setdefault(
        key,
        m.BuildParameters(**kwargs),
    )

In [None]:
def functions_from_path(binary_path: pl.Path) -> tuple[list[m.Function], list]:
    pickle_path = pl.Path(str(binary_path) + "filtered2.pickle")
    with pickle_path.open("rb") as f:
        function_data_list = pickle.load(f)

    functions = []
    for data in function_data_list:
        function = m.Function(
            # XXX What about 'demangled_name' and 'name'
            name=data["demangled_full_name"],
            lineno=data["src_line"],
            file=data["src_file"],
            size=data["size"],
            offset=data["bin_offset"],
            vector=None,
        )
        functions.append(function)

    return functions, function_data_list

In [None]:
def binary_from_path(binary_path: pl.Path) -> m.Binary:
    with binary_path.open("rb") as f:
        md5 = hashlib.file_digest(f, "md5").hexdigest()

    parse_result = parse_binary_path(binary_path)

    functions, data = functions_from_path(binary_path)
    try:
        image_base = next(iter(data))["img_base"]
    except StopIteration:
        image_base = -1

    return m.Binary(
        name=parse_result.executable_name,
        md5=md5,
        package=m.Package(
            name=parse_result.package_name,
            version=parse_result.package_version,
        ),
        build_parameters=build_parameters_from_binary_path_parse_result(parse_result),
        functions=functions,
        image_base=image_base,
        size=binary_path.stat().st_size,
    )

In [None]:
def binary_path_from_model(binary: m.Binary) -> pl.Path:
    bparams = binary.build_parameters
    pkg = binary.package
    return pl.Path(
        "/home/maringuu/workspace/sources/~maringuu/|master-thesis/evaluation/data/kim2023revisiting/"
        "gnu_debug",
        pkg.name,
        "_".join(
            [
                f"{pkg.name}-{pkg.version}",
                f"{bparams.compiler_backend}-{bparams.compiler_version}",
                f"{bparams.architecture}_{bparams.bitness}",
                bparams.optimisation,
                f"{binary.name}.elf",
            ]
        ),
    )

In [None]:
# 500 binaries -- 36.1s
MAX_BINARIES = 500

package_path = pl.Path(
    "/home/maringuu/workspace/sources/~maringuu/|master-thesis/evaluation/data/kim2023revisiting/"
    "gnu_debug",
    "binutils",
)

binaries = []
for binary_path in sorted(list(package_path.glob("*.elf")))[:MAX_BINARIES]:
    binary = binary_from_path(binary_path)

    binaries.append(binary)

In [None]:
stmt = sa.select(
    m.Binary.hid,
)
existing_hids = list(session.scalars(stmt))
existing_hids

In [None]:
missing_binaries = [binary for binary in binaries if binary.hid not in existing_hids]
len(missing_binaries)

In [None]:
session.add_all(missing_binaries)
session.commit()

## Ghidra and BSim Analysis

In [None]:
import studeerwerk.ghidra.load  # noqa

from studeerwerk import cfg
from studeerwerk.ghidra import types

In [None]:
from evaluatie import models as m

In [None]:
import sqlalchemy as sa
import logging
import pathlib as pl

In [None]:
logging.basicConfig(level="DEBUG")

In [None]:
session = m.Session()

In [None]:
stmt = (
    sa.select(
        m.Binary,
    )
    .where(
        m.Binary.name.in_(
            [
                "ar",
                "objdump",
            ]
        ),
    )
    .join(
        m.Binary.build_parameters,
    )
    .where(
        m.BuildParameters.optimisation.in_(
            [
                "O0",
                "O3",
            ]
        ),
        m.BuildParameters.architecture.in_(
            [
                "arm",
                "mips",
            ]
        ),
    )
)

binaries = list(session.scalars(stmt))
len(binaries)

In [None]:
project = types.Project.from_marker_path(
    # "/home/maringuu/workspace/ghidra/studeerwerk/test.gpr",
    cfg.gets("studeerwerk", "ghidra-project-path"),
    create=True,
)
project

In [None]:
for binary in binaries:
    if binary.hid in project.programs:
        continue

    binary_path = binary_path_from_model(binary)
    project.add_program_from_path(
        binary_path,
        name=binary.hid,
    )

project.save()

In [None]:
conn = types.Connection.from_url(
    # "postgresql://postgres:postgres@localhost:5432/test",
    cfg.gets("studeerwerk", "postgres-url"),
)
conn

In [None]:
signature_generator = types.SignatureGenerator.from_connection(conn)
signature_generator

In [None]:
def get_analyze_only_known_functions_impl_fn(binary: m.Binary):
    def _impl(program: types.Program):
        stmt = sa.select(
            m.Function,
        ).where(
            m.Function.binary_id == binary.id,
        )

        min_addr = program.api.toAddr(0).getAddressSpace().getMinAddress().getOffset()
        assert min_addr == 0, hex(min_addr)

        functions = list(session.scalars(stmt))
        for function in functions:
            addr = program.api.toAddr(binary.image_base + function.offset)
            ghidra_function = program.api.createFunction(
                addr,
                None,
            )
            program.api.disassemble(addr)

    return _impl

In [None]:
# 8 Executables (ar, O0-O3, arm32 and arm64) took 8 minutes (analysis only)
# The generation step was 2m16s
# 20 Executables took 46m16s
for binary in binaries:
    if (program := project.programs.get(binary.hid, None)) is None:
        continue

    program.analyze(
        options=types.SignatureGenerator.ANALYSIS_OPTIONS,
        force=False,
        # analyze_impl=get_analyze_only_known_functions_impl_fn(binary),
    )

    # XXX It should be possible to check if this is alreay done.
    try:
        signature_generator.generate_and_insert(program)
    except RuntimeError as e:
        logging.error(e)

In [None]:
project.save()