In [None]:
import mmap
import re
import shutil
from pathlib import Path
from typing import cast

import lief
import pefile
import polars as pl
import numpy as np

Before starting:

Launch `fake_neomon_host/host-bind.exe` with x64dbg under ScyllaHide and reach the NeoMon.dll OEP (0x10013903). Make sure ASLR is off for both .exe and .dll.

Then collect data:
- x64dbg/Symbols -> `dumps/dump-imports.csv`
- x64dbg/ModulePathListExports -> `NeoMon.dll.export.full.csv`
- x64dbg/ModulePathListImports -> `NeoMon.dll.import.full.csv`
- dump 0x0213 .. 0x0218 sections that themida in NeoMon.dll has created, to `fake_neomon_host/neomon213.bin` (213, 214, etc.)
- Dump the NeoMon.dll using Scylla (you can specify 0x13903 OEP, but do not import IAT or fix dump)
- Open that NeoMon_dump.dll in IDA Pro and launch `scripts/extract-old-iat.py`
- and `scripts/extract-byte-calls.py`

# Paths

Everything here is done on a simple dllhost exe `fake_neomon_host/host-bind.exe`

Dumps are made once the _DllEntryPoint is reached (0x13903)

In [None]:
base = Path("../neomon-dump/dumps")
base_patch = Path("../neomon-dump/patches")

In [None]:
# imports csv

dump_imports_p = base / "dump-imports.csv"
old_iat_p = base / "old-iat.csv"
byte_calls_p = base / "broken-byte-calls.csv"

module_imports_p = base / "../NeoMon.dll.import.full.csv"
module_exports_p = base / "../NeoMon.dll.export.full.csv"

In [None]:
# exports csv
patch_thunks_p = base_patch / "thunks_patch.csv"
patch_calls_p = base_patch / "calls_patch.csv"
patch_iat_p = base_patch / "iat_patch.csv"

In [None]:
base_to_exe = Path("../fake_neomon_host")
original_dump_path = base_to_exe / "NeoMon_dump.dll"
patched_path = base_to_exe / "NeoMon_patched.dll"

In [None]:
fake_sections = range(213, 217)

In [None]:
from addr_helpers import int_to_LE, rel_call, to_bin, from_bin

patch_schema = {
    "patch_addr": pl.String,
    "mem_old": pl.String,
    "patch": pl.String,
}

# Parsing import table

In [None]:
def to_int_expr(col: str = "Address") -> pl.Expr:
    return pl.col(col).str.slice(2).str.to_integer(base=16)


def addr_to_int(df: pl.DataFrame, col: str = "Address") -> pl.DataFrame:
    return df.with_columns(to_int_expr(col))


def int_to_addr(
    df: pl.DataFrame, col: str = "Address", sort: bool = False
) -> pl.DataFrame:
    arr = df[col].to_numpy()
    hex_arr = np.char.add("0x", np.char.lower(np.char.mod("%x", arr)))
    return normalize_address(
        df.with_columns(pl.Series(col, hex_arr)), col=col, sort=sort
    )


def normalize_address(
    df: pl.DataFrame, col: str = "Address", sort: bool = True
) -> pl.DataFrame:
    df = df.with_columns(
        (
            "0x"
            + pl.when(pl.col(col).str.starts_with("0x"))
            .then(pl.col(col).str.strip_prefix("0x"))
            .otherwise(col)
            .str.to_lowercase()
            .str.strip_chars_start("0")
        ).alias(col)
    ).with_columns(
        pl.when(pl.col(col) == "0x").then(pl.lit("0x0").alias(col)).otherwise(col)
    )
    if sort:
        return df.sort(to_int_expr(col))
    return df

To match asm calls and jumps with functions, we collect all exported functions to `dump_imports`

In [None]:
dump_imports = pl.read_csv(dump_imports_p)
dump_imports.columns = [
    "Address",
    "Type",
    "Ordinal",
    "Symbol",
    "undecorated",
]
dump_imports = dump_imports.drop("undecorated")  # doesn't make any sense
dump_imports = dump_imports.with_columns(
    pl.when(pl.col("Type") == "Экспорт")
    .then(pl.lit("Export").alias("Type"))
    .otherwise(pl.lit("Import").alias("Type"))
)  # russian to english
dump_imports = dump_imports.filter(
    pl.col("Symbol") != "OptionalHeader.AddressOfEntryPoint"
)  # OEPs are never referenced
dump_imports = normalize_address(dump_imports, "Address")  # remove leading 0s

print(dump_imports.shape)
dump_imports.sample(3)

Default view doesn't mention module names. ModulePathList plugin does.

module_exports collects exported entries for each module. \
module_imports collects IAT for each module (IAT may be hidden by themida, these are not parsed)

In [None]:
module_exports = pl.read_csv(module_exports_p)
module_exports = module_exports.filter(
    ~pl.col("Module").str.ends_with(".exe")
)  # not relevant
module_exports = normalize_address(module_exports, "Address")  # remove leading 0s
module_exports = module_exports.unique("Address")  # remove aliases
print("Exports:", module_exports.shape)

module_imports = (
    pl.read_csv(module_imports_p)
    .drop("Function", "Module")
    .rename(
        {
            "Address": "IAT_addr",  # IAT line address
            "Bytes": "Address",  # IAT line content
            "Modname": "Module",
            "Symname": "Function",
        }
    )
)
module_imports = normalize_address(module_imports, "Address")
module_imports = normalize_address(module_imports, "IAT_addr")
print("Imports:", module_imports.shape)

Themida sometimes obfuscates calls using 3rd party IAT. We collect them all to deobsuscate back. \
`proxy_imports` now contains IATs of modules. \
`dump_imports` now contains only exported functions (to map module names)

In [None]:
proxy_imports = dump_imports.join(
    module_imports.rename({"IAT_addr": "Address", "Address": "Target"}),
    on="Address",
    how="left",
)
proxy_imports = proxy_imports.filter(pl.col("Type") == "Import").drop(
    "Type", "Ordinal", "Symbol", "Module"
)

dump_imports = dump_imports.filter(pl.col("Type") == "Export").drop("Type")
# map with module names
dump_imports = dump_imports.join(module_exports, on="Address", how="left")
# Names only seem correct for exported symbols
dump_imports = dump_imports.drop("Function").rename({"Symbol": "Function"})

# map with module names
proxy_imports = proxy_imports.join(
    dump_imports.select("Address", "Module"), left_on="Target", right_on="Address"
)

print("IAT Imports:", proxy_imports.shape)
print("True imports:", dump_imports.shape)

In [None]:
assert dump_imports.filter(pl.col("Module").is_null()).shape[0] == 0
assert proxy_imports.filter(pl.col("Module").is_null()).shape[0] == 0

# Gathering imports from old IAT

In [None]:
iat = pl.read_csv(old_iat_p)
print(iat.shape)

iat = iat.rename({"Address": "Calladdr", "Destination": "Address"})
iat = iat.with_columns(("0x" + pl.col("Address").str.to_lowercase()).alias("Address"))
iat = normalize_address(iat, sort=False)
iat = iat.join(dump_imports.unique("Address"), on="Address", how="left")
print(iat.shape)

In [None]:
for i in fake_sections:
    modname = f"section_{i}"
    iat = iat.with_columns(
        pl.when(pl.col("Address").str.slice(2, 3) == str(i))
        .then(pl.lit(modname))
        .otherwise("Module")
        .alias("Module")
    )

Make sure iat2 is empty, i.e. no unknown calls present

In [None]:
iat2 = iat.filter(pl.col("Module").is_null())
iat2 = iat2.filter(pl.col("Address") != "0x0")
assert iat2.shape[0] == 0

Cancel forwarding imports (e.g. kernel32.HeapAlloc -> ntdll.RtlAllocateHeap)

In [None]:
dll_forward_to = {"ntdll.dll", "kernelbase.dll"}

def get_unforward_map(
    forwarding_modules=["kernel32.dll", "user32.dll"],
) -> dict[str, tuple[str, str]]:
    global forward_to
    
    systemroot = "C:/Windows/SysWOW64/"

    unforward_map: dict[str, tuple[str, str]] = dict()

    forwarded = iat.filter(pl.col("Module").is_in(dll_forward_to))

    for modname in forwarding_modules:
        modpath = systemroot + modname
        number = 0

        dll = pefile.PE(modpath)
        dll.full_load()
        for exp in dll.DIRECTORY_ENTRY_EXPORT.symbols:
            name = exp.name.decode() if exp.name else f"Ordinal#{exp.ordinal}"
            forward_to = ""
            if exp.forwarder:
                number += 1
                forward_to = exp.forwarder.decode().removeprefix("NTDLL.")
                unforward_map[forward_to] = (modname, name)

        print(f"For {modname} there are {number} forwards")
    return unforward_map


unforward_map = get_unforward_map()

In [None]:
for func in iat.filter(pl.col("Module").is_in(dll_forward_to))["Function"]:
    if func not in unforward_map:
        print(f"Func {func} from ntdll.dll is not found in forward map")
        continue

    origmod, origfunc = unforward_map[func]

    if "InitializeCrit" in func:
        print(func, origmod, origfunc)

    condition = (pl.col("Module").is_in(dll_forward_to)) & (pl.col("Function") == func)
    iat = iat.with_columns(
        [
            pl.when(condition)
            .then(pl.lit(origmod))
            .otherwise("Module")
            .alias("Module"),
            pl.when(condition)
            .then(pl.lit(origfunc))
            .otherwise("Function")
            .alias("Function"),
        ]
    )

In [None]:
iat.write_csv(str(old_iat_p) + "2.csv")

In [None]:
iat_seg = (
    iat.sort("Calladdr")
    .fill_null("")
    .with_columns(
        (pl.col("Module") != pl.col("Module").shift(1)).cum_sum().alias("segment_id")
    )
    .fill_null(0)
    .filter(to_int_expr("Address") != 0)
    .filter(pl.col("Module") != "")
    .filter(~pl.col("Module").str.starts_with("section_"))
)
segments = [
    group.drop("segment_id")
    for _, group in iat_seg.group_by("segment_id", maintain_order=True)
]

In [None]:
# extract obfuscated imports for later
obfuscated = iat.filter(pl.col("Module").str.starts_with("section_")).filter(
    to_int_expr("Address") != 0
)
obfuscated

In [None]:
# remove gaps and obfuscated imports
w = iat.shape[0]
iat = iat.filter(pl.col("Module").is_not_null())
print(f"Filtered out {iat.shape[0]}/{w} iat entries")

In [None]:
# confirm all names are decorated
assert iat.filter(pl.col("Function").str.contains("public")).shape[0] == 0

# Gathering imports from fake sections

In [None]:
def find_pattern_in_file(path: Path, pattern: str, first_only: bool = False):
    """
    Search a binary file for a pattern like "90 e8 ? ? ? ?".
    Returns a list of integer offsets (file positions) where the pattern matches.
    If first_only is True, returns a list with at most one offset.
    """
    tokens = pattern.split()
    parts = []
    for t in tokens:
        if t == "?":
            parts.append(b".")  # regex "any byte"
        else:
            parts.append(re.escape(bytes.fromhex(t)))  # escape literal byte

    regex = re.compile(b"".join(parts), flags=re.DOTALL)

    offsets = []
    with open(path, "rb") as f:
        mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
        for m in regex.finditer(mm):
            offsets.append(m.start())
            if first_only:
                break
        mm.close()
    return offsets

In [None]:
def make_fake_calls_table(idx: int, pattern: str) -> pl.DataFrame:
    pattern = pattern.lower()
    fn = base_to_exe / f"neomon{idx}.bin"

    calls = [ea for ea in find_pattern_in_file(fn, pattern)]
    with open(fn, "rb") as f:
        mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
        # calculate the offset of the next instruction (to add to rel32 addr) and the addr location itself
        if pattern.startswith(("e8", "e9")):  # rel32
            next_op = [ea + 5 for ea in calls]
            addr_shift = [ea + 1 for ea in calls]
        elif pattern.startswith(("0f 84", "0f 85")):  # rel32
            next_op = [ea + 6 for ea in calls]
            addr_shift = [ea + 2 for ea in calls]
        elif pattern.startswith(("ff 15", "ff 25")):  # imm32
            next_op = [0] * len(calls)
            addr_shift = [ea + 2 for ea in calls]
        else:
            raise RuntimeError(f"Unsupported pattern {pattern}")

        dest = [
            shift + parse_dest(mm[addr : addr + 4])
            for shift, addr in zip(next_op, addr_shift)
        ]

    match pattern:
        case s if s.startswith("e8"):
            inst = "call-near"
        case s if s.startswith("e9"):
            inst = "jmp-near"
        case s if s.startswith("0f 84"):
            inst = "jne-near"
        case s if s.startswith("0f 85"):
            inst = "je-near"
        case s if s.startswith("ff 15"):
            inst = "call-far"
        case s if s.startswith("ff 25"):
            inst = "jmp-far"
        case _:
            raise RuntimeError(f"Unsupported pattern {pattern}")

    return pl.DataFrame(
        {
            "subroutine": None,
            "Instruction": inst,
            "Call address": list(map(hex, calls)),
            "Destination": list(map(hex, dest)),
            "Resolved name": None,
        }
    )


def parse_dest(addr: bytes) -> int:
    return from_bin(to_bin(bytearray(addr)))


In [None]:
patterns = [
    "e8 ? ? ? ?",  # call-near
    "e9 ? ? ? ?",  # jmp-near
    "ff 25 ? ? ? ?",  # jmp-far
    "ff 15 ? ? ? ?",  # call-far
    "0f 84 ? ? ? ?",  # jne-near
    "0f 85 ? ? ? ?",  # je-near
]

fake_calls = pl.DataFrame(
    schema={
        "subroutine": pl.String,
        "Instruction": pl.String,
        "Call address": pl.String,
        "Destination": pl.String,
        "Resolved name": pl.String,
        "Section": pl.Int32,
    }
)
for idx in fake_sections:
    for patt in patterns:
        fake_calls = fake_calls.vstack(
            make_fake_calls_table(idx, patt).with_columns(pl.lit(idx).alias("Section"))
        )
fake_calls = fake_calls.sort("Section", to_int_expr("Call address"))
fake_calls = fake_calls.with_columns(
    ("0x" + pl.col("Section").cast(dtype=pl.String) + "0000").alias("Section_addr")
)
fake_calls = fake_calls.with_columns(
    pl.when(pl.col("Instruction").str.ends_with("far"))
    .then(to_int_expr("Destination"))
    .otherwise(to_int_expr("Destination") + to_int_expr("Section_addr"))
)
fake_calls = fake_calls.sort(to_int_expr("Call address"))
fake_calls = int_to_addr(
    fake_calls,
    "Destination",
)

print(fake_calls.shape)

In [None]:
fake_calls = fake_calls.filter(to_int_expr("Destination") > 0x67780000).filter(
    to_int_expr("Destination") < 0x77E00000
)
temp_fake = int_to_addr(
    fake_calls.with_columns(to_int_expr("Call address") + to_int_expr("Section_addr")),
    "Call address",
)
temp_fake = temp_fake.drop("Resolved name", "Section_addr")
print(temp_fake.shape)
# temp_fake.filter(pl.col("Section") == 213).write_csv('temp.csv')

In [None]:
valid_addresses = set(
    dump_imports["Address"].to_list() + proxy_imports["Address"].to_list()
)

fake_calls = fake_calls.filter(pl.col("Destination").is_in(valid_addresses))
print("Fake calls:", fake_calls.shape)
print("Obfuscated IAT entries:", obfuscated.shape)

In [None]:
temp_fake2 = int_to_addr(
    fake_calls.with_columns(to_int_expr("Call address") + to_int_expr("Section_addr")),
    "Call address",
)

temp_fake3 = temp_fake.filter(
    ~pl.col("Destination").is_in(temp_fake2["Destination"].to_list())
)

print(temp_fake3.shape)
temp_fake3.write_csv("temp.csv")

Basically temp_fake contains only valid calls (checked manually), and they are not pointing to the exported functions and even not at the starting ops of the functions. Scary.

There is no reason to compare obfuscated IAT entries against fake_calls. Some of the entries point at very concise arithmetics

In [None]:
fake_calls = temp_fake
print("Fake calls:", fake_calls.shape)
print("Obfuscated IAT entries:", obfuscated.shape)
fake_calls.head(3)

In [None]:
fake_calls_fix = fake_calls.with_columns(("0x0" + pl.col("Section").cast(str) + "0000").alias("Section"))
fake_calls_fix = fake_calls_fix.sort("Call address")

double_iat = fake_calls_fix.join(
    proxy_imports, left_on="Destination", right_on="Address", how="left"
).filter(pl.col("Target").is_not_null())
print(double_iat.shape)
double_iat.head(3)

# Move fake sections to the end

In [None]:
shutil.copy(original_dump_path, patched_path)

pe_lief = cast(lief.PE.Binary, lief.PE.parse(patched_path))
pe_lief.remove_all_imports()

In [None]:
def get_data(sec_i: int) -> bytes:
    return open(base_to_exe / f"neomon{sec_i}.bin", "rb").read()

In [None]:
fake_sections_map = pl.DataFrame(schema={"Section": pl.Int64, "Baseaddr": pl.String})

for sec_i in fake_sections:
    sec = lief.PE.Section(f".fake{sec_i}")
    data = get_data(sec_i)
    sec.content = memoryview(data)

    sec.virtual_size = len(data)

    CH = lief.PE.Section.CHARACTERISTICS
    # sec.characteristics = int(CH.MEM_READ | CH.MEM_WRITE | CH.MEM_EXECUTE | CH.CNT_INITIALIZED_DATA)
    sec.characteristics = int(CH.MEM_EXECUTE | CH.CNT_INITIALIZED_DATA)

    pe_lief.add_section(sec)

    sec = pe_lief.get_section(f".fake{sec_i}")
    if sec is None:
        print(f"Error: failed to add section {sec_i}")
    else:
        fake_sections_map = fake_sections_map.vstack(
            pl.DataFrame(
                {
                    "Section": sec_i,
                    "Baseaddr": hex(pe_lief.imagebase + sec.virtual_address),
                }
            )
        )

In [None]:
config = lief.PE.Builder.config_t()
config.imports = True

bb = lief.PE.Builder(pe_lief, config)
bb.build()
bb.write(str(patched_path))

# Constructing new IDT

In [None]:
def create_32bit_ordinal_import(ordinal_number: int) -> lief.PE.ImportEntry:
    """
    Create a 32-bit import by ordinal

    Args:
        ordinal_number: The ordinal number (0-65535)
    """
    # Validate ordinal range
    if ordinal_number < 0 or ordinal_number > 0xFFFF:
        raise ValueError("Ordinal number must be between 0 and 65535")

    # For 32-bit PE:
    # - Set bit 31 to 1 (0x80000000)
    # - Bits 30-16 must be 0
    # - Bits 15-0 contain the ordinal
    ORDINAL_MASK_32 = 0x80000000
    data_value = ORDINAL_MASK_32 | ordinal_number

    # Create the import entry
    entry = lief.PE.ImportEntry(data_value, lief.PE.PE_TYPE.PE32)

    return entry

In [None]:
from pydantic import BaseModel


class Sect(BaseModel):
    name: str
    raw_addr: int
    raw_size: int
    virt_addr: int
    virt_size: int
    chars: int

    @staticmethod
    def from_section(section: lief.PE.Section) -> "Sect":
        return Sect(
            name=section.name,
            raw_addr=section.offset,
            raw_size=section.size,
            virt_addr=section.virtual_address,
            virt_size=section.virtual_size,
            chars=section.characteristics,
        )

    def to_section(self) -> lief.PE.Section:
        sect = lief.PE.Section(self.name)
        sect.name = self.name
        sect.offset = self.raw_addr
        sect.size = self.raw_size
        sect.virtual_address = self.virt_addr
        sect.virtual_size = self.virt_size
        sect.characteristics = self.chars
        return sect

In [None]:
# adding proper .idata
# new_idata = lief.PE.Section(".idata")
# new_idata.offset = idata_offset
# new_idata.size = idata_size
# new_idata.virtual_address = idata_virtual_address - pe_lief.imagebase
# new_idata.virtual_size = idata_virtual_size
# new_idata.characteristics = pe_lief.sections[0].characteristics
# pe_lief.add_section(new_idata)

# sections = [Sect.from_section(sec) for sec in pe_lief.sections]
# sections = [sections[0], Sect.from_section(new_idata)] + sections[1:]

# N = len(pe_lief.sections)
# for i in range(N):
#     pe_lief.sections[i].name = f'{i}'

# for i in range(N):
#     pe_lief.remove_section(f'{i}')

# for sec in sections:
#     pe_lief.add_section(sec.to_section())

In [None]:
pe_lief = cast(lief.PE.Binary, lief.PE.parse(patched_path))

In [None]:
for s in pe_lief.sections:
    print(s.name, hex(s.virtual_address), hex(s.virtual_address + s.virtual_size))

### Adding imports

Creates brand new IDT with new IAT and ILT

In [None]:
for seg in segments:
    dll = seg["Module"][0]
    if dll is None or dll == "":
        continue

    mod = pe_lief.add_import(dll)
    for calladdr, addr, ordinal, func, mname in seg.rows():
        if func.startswith("Ordinal#"):
            # ordinal = int(func.removeprefix("Ordinal#"))
            entry = create_32bit_ordinal_import(ordinal)
        else:
            entry = lief.PE.ImportEntry(func)
        mod.add_entry(entry)

Add imports that were not present in the original IAT

In [None]:
lenseg = len(segments)
for dll in double_iat.unique("Module")['Module']:
    mod = pe_lief.add_import(dll)
    lenseg += 1
    for func in double_iat.filter(pl.col("Module") == dll).unique("Function")["Function"]:
        mod.add_entry(func)

In [None]:
config = lief.PE.Builder.config_t()
config.imports = True

bb = lief.PE.Builder(pe_lief, config)
bb.build()
bb.write(str(patched_path))

Reset IAT to the old IAT address

In [None]:
pe = pefile.PE(patched_path)
pe.full_load()

In [None]:
assert len(pe.DIRECTORY_ENTRY_IMPORT) == lenseg, (  # type: ignore
    "Change the MAX_REPEATED_ADDRESSES to >20"
)

In [None]:
for i, seg in enumerate(segments):
    first_thunk = int(seg["Calladdr"][0], 16)

    pe.DIRECTORY_ENTRY_IMPORT[i].struct.FirstThunk = (  # type: ignore
        first_thunk - pe.OPTIONAL_HEADER.ImageBase  # type: ignore
    )

### Fixing calls to new IAT entries

In [None]:
double_iat = double_iat.with_columns(pl.col("Destination").alias("New destination"))

for i in range(double_iat.unique("Module").shape[0]):
    mod_entry = pe.DIRECTORY_ENTRY_IMPORT[-i - 1]
    dll = mod_entry.dll.decode()
    for j in range(len(mod_entry.imports)):
        entry = mod_entry.imports[j]
        func = entry.name.decode()

        double_iat = double_iat.with_columns(
            pl.when((pl.col("Module") == dll) & (pl.col("Function") == func))
            .then(pl.lit(hex(entry.address)).alias("New destination"))
            .otherwise(pl.col("New destination"))
        )

double_iat.head(3)

### Fixing IAT to the fake sections

In [None]:
new_fake_sections = pl.DataFrame(
    [
        {
            "Module": s.Name.decode().replace(".fake", "section_"),
            "NAddress": hex(s.VirtualAddress + pe.OPTIONAL_HEADER.ImageBase),  # type: ignore
            "OAddress": f"0x0{s.Name.decode().replace('.fake', '')}0000",
        }
        for s in pe.sections
        if s.Name.decode().startswith(".fake")
    ]
)

In [None]:
obfuscated_iat = obfuscated.join(new_fake_sections, on="Module").drop(
    "Ordinal", "Function"
)
obfuscated_iat.head(3)

In [None]:
iat_patch = pl.DataFrame(schema=patch_schema)
for calladdr, addr, module, new_addr, old_addr in obfuscated_iat.rows():
    offset = int(new_addr, 16) - int(old_addr, 16)
    naddr = int(addr, 16) + offset

    add_iat = pl.DataFrame(
        {
            "patch_addr": calladdr,
            "mem_old": to_bin(int_to_LE(int(addr, 16))),
            "patch": to_bin(int_to_LE(naddr)),
        }
    )
    iat_patch = iat_patch.vstack(add_iat)

print(iat_patch.shape)
iat_patch.head(3)

In [None]:
iat_patch.write_csv(patch_iat_p)

### Saving stuff

In [None]:
temp = "tmp"
pe.write(filename=temp)
pe.close()
shutil.move(temp, patched_path)

# Fix calls

Suspect calls are <90 e8 ? ? ? ?> and <e8 ? ? ? ? 90>. The only concerning calls are "optimized". The rest point to thunks (restored) or iat (restored).

In [None]:
calls = pl.read_csv(byte_calls_p).filter(
    pl.col("Instruction").is_in(["call", "call2", "jmp"])
)
calls = calls.drop("Resolved name")
calls = calls.vstack(fake_calls.drop("Section"))

# calls_inst = pl.read_csv(inst_calls_p).clear() # deprecated
# calls = calls.vstack(calls_inst).unique("Call address")
calls = calls.with_columns(to_int_expr("Call address").alias("Int_addr"))

print(calls.shape)
calls.head(3)

In [None]:
# remove dupes (either way they couldn't lead to real api call)
addrs = set(calls["Int_addr"].to_list())
calls = calls.filter(~(pl.col("Int_addr") - 1).is_in(addrs)).drop("Int_addr")

print(calls.shape)

In [None]:
# filter only calls that point to api calls directly (still rel32 though)
valid_addresses = set(dump_imports["Address"].to_list())
calls = calls.filter(pl.col("Destination").is_in(valid_addresses))

In [None]:
iats_to_join = iat.select(
    pl.col("Address").alias("Destination"), pl.col("Calladdr").alias("iat address")
).unique("Destination")
calls = calls.join(iats_to_join, on="Destination", how="left")

print(calls.shape)
calls.head(3)

In [None]:
# all calls have their iat entry
uniated = calls.filter(pl.col("iat address").is_null()).shape[0]
print("Uniated:", uniated)
# assert uniated == 0, uniated
# disabled, as this is in progress

# Fix fake calls

In [None]:
def addr_expr(df: pl.DataFrame, ss: list[str], dest: pl.Expr) -> pl.DataFrame:
    for s in ss:
        df = addr_to_int(df, s)
    df = df.with_columns(dest)
    for s in ss:
        df = int_to_addr(df, s)
    return df

In [None]:
double_iat_fix = double_iat.join(
    fake_sections_map.with_columns(
        ("0x0" + pl.col("Section").cast(str) + "0000").alias("Section")
    ),
    on="Section",
)

double_iat_fix = addr_expr(
    double_iat_fix,
    ["Baseaddr", "Section", "Call address"],
    (pl.col("Baseaddr") + pl.col("Call address") - pl.col("Section")).alias(
        "Call address"
    ),
)
double_iat_fix = double_iat_fix.drop(
    "Section", "Target", "Function", "Module", "Baseaddr"
)

# far calls are 6-byte wide
assert all('far' in fci for fci in double_iat_fix['Instruction'])

print(double_iat_fix.shape)
double_iat_fix.head(3)

In [None]:
fake_calls_fix = fake_calls.join(fake_sections_map, on="Section")
fake_calls_fix = fake_calls_fix.with_columns(
    ("0x0" + pl.col("Section").cast(str) + "0000").alias("Section")
)
# these are treated via double_iat_fix
fake_calls_fix = fake_calls_fix.filter(
    ~pl.col("Call address").is_in(double_iat["Call address"].to_list())
)

fake_calls_fix = addr_expr(
    fake_calls_fix,
    ["Baseaddr", "Section", "Call address"],
    (pl.col("Baseaddr") + pl.col("Call address") - pl.col("Section")).alias(
        "New call address"
    ),
)
fake_calls_fix = int_to_addr(fake_calls_fix, "New call address").drop(
    "Baseaddr", "Section"
)
fake_calls_fix = normalize_address(fake_calls_fix, "New call address", sort=False)

# near calls are 5-byte wide
assert all("near" in fci for fci in fake_calls_fix["Instruction"])

fake_calls_fix = fake_calls_fix.with_columns(
    pl.when(pl.col("Instruction").is_in({"call-near", "jmp-near"}))
    .then(pl.lit(5).alias("shift"))
    .otherwise(pl.lit(6).alias("shift"))
)
fake_calls_fix = addr_expr(
    fake_calls_fix,
    ["Destination", "Call address"],
    (pl.col("Destination") - pl.col("Call address") - pl.col("shift")).alias("Old rva"),
)
fake_calls_fix = int_to_addr(fake_calls_fix, "Old rva")
fake_calls_fix = normalize_address(fake_calls_fix, "Old rva", sort=False)
fake_calls_fix = fake_calls_fix.with_columns(
    pl.col("New call address").alias("Call address")
).drop("New call address", "shift")

print(fake_calls_fix.shape)
fake_calls_fix.head(3)

# Patch PE

In [None]:
def patch_call_to_imm(
    addr: str, inst: str, dest: str, imm_addr: str, nop_first: bool = False
) -> dict[str, str]:
    """Create patch entry for 6-byte-long call or jump. Result is jmp,imm32 or call,imm32

    For 5-byte call-near or jmp-near, assuming src command is nop-padded.
    nop_first is the flag which controls if it's front-padded or back-padded.
    """
    next_addr = hex(int(addr, 16) + 6)

    new_ibin = to_bin(int_to_LE(int(imm_addr, 16)))
    old_ibin = to_bin(int_to_LE(int(dest, 16)))
    old_rbin = to_bin(rel_call(next_addr, dest))
    if inst in ("call-near", "jmp-near") and not nop_first:
        old_rbin = to_bin(rel_call(hex(int(addr, 16) + 5), dest))

    match inst:
        case "call-near":
            if nop_first:
                mem_old = "90E8" + old_rbin
            else:
                mem_old = "E8" + old_rbin + "90"
            patch = "FF15" + new_ibin
        case "jmp-near":
            if nop_first:
                mem_old = "90E9" + old_rbin
            else:
                mem_old = "E9" + old_rbin + "90"
            patch = "FF25" + new_ibin
        case "call-far":
            mem_old = "FF15" + old_ibin
            patch = "FF15" + new_ibin
        case "jmp-far":
            mem_old = "FF25" + old_ibin
            patch = "FF25" + new_ibin
        case "jne-near":
            mem_old = "0F84" + old_rbin
            raise RuntimeError("Need a thunk to perform conditional near jump to imm32")
        case "je-near":
            mem_old = "0F85" + old_rbin
            raise RuntimeError("Need a thunk to perform conditional near jump to imm32")
        case _:
            raise RuntimeError(f"Unsupported instruction {inst}")

    return {
        "patch_addr": addr,
        "mem_old": mem_old,
        "patch": patch,
    }

In [None]:
def patch_rel_call(
    addr: str, inst: str, old_rva: str, new_dest: str,
) -> dict[str, str]:
    """ Create patch entry for call-near or jmp-near. """
    assert 'near' in inst
    next_addr = hex(int(addr, 16) + 6)
    if inst in ('call-near', 'jmp-near'):
        next_addr = hex(int(addr, 16) + 5)
    
    new_rbin = to_bin(rel_call(next_addr, new_dest))
    old_rbin = to_bin(int_to_LE(int(old_rva, 16)))

    match inst:
        case "call-near":
            mem_old = "E8" + old_rbin
            patch = "E8" + new_rbin
        case "jmp-near":
            mem_old = "E9" + old_rbin
            patch = "E9" + new_rbin
        case "jne-near":
            mem_old = "0F84" + old_rbin
            patch = "0F84" + new_rbin
        case "je-near":
            mem_old = "0F85" + old_rbin
            patch = "0F85" + new_rbin
        case _:
            raise RuntimeError(f"Unsupported instruction {inst}")

    return {
        "patch_addr": addr,
        "mem_old": mem_old,
        "patch": patch,
    }

In [None]:
patch_data = []

for call in double_iat_fix.rows():
    sub, inst, addr, dest, iat_addr = call
    patch_data.append(patch_call_to_imm(addr, inst, dest, iat_addr))

print(len(patch_data))

In [None]:
for call in fake_calls_fix.rows():
    sub, inst, addr, dest, old_rva = call
    patch_data.append(patch_rel_call(addr, inst, old_rva, dest))

print(fake_calls_fix.shape[0])

In [None]:
# Create DataFrame in one operation
calls_patch = pl.DataFrame(patch_data, schema=patch_schema).sort("patch_addr")
calls_patch.shape

In [None]:
calls_patch.write_csv(patch_calls_p)

Now run ida_patch.py script in IDA Pro and apply changes

# Troubleshooting

In [None]:
print("OK!")