In [None]:
#!/usr/bin/env python3
# ================================================================
#  mini_x86_emulator.py — stub‑runner, ROP verifier, taint lab
#  * v3: Jupyter‑friendly (swallows ‑f, no SystemExit on no args)
# ================================================================

from __future__ import annotations
import json
import argparse
import pathlib
import sys
import tkinter.messagebox as tkmsg
from enum import Enum, auto
from typing import Dict, List, Set, Tuple



# ----------------------------------------------------------------
#  CPUFault enum (x86‑64 ring‑3 style)
# ----------------------------------------------------------------
class CPUFault(Enum):
    GP   = auto()       # General‑Protection
    UD   = auto()       # Undefined‑Opcode
    SEG  = auto()       # Segmentation / generic mem fault


# ----------------------------------------------------------------
#  CPU state with taint + instrumentation
# ----------------------------------------------------------------
class CPUState:
    def __init__(self, blob: bytes, base: int = 0x1000,
                 gui: bool = False, stub_id: str = "blob"):
        self.blob, self.BASE = blob, base
        self.ip              = base

        # --- registers we actually touch
        self.rsp  = base - 1            # crafted so first RET → BASE+7
        self.rdi  = 0xDEADBEEFCAFEBABE
        self.rdx  = self.rcx = 0
        self.rax  = 0x2000              # points into test RAM

        # --- memory + taint bitmap
        self.memory: Dict[int, int] = {0x2000: 0x1234_5678}
        self.taint:  Set[int]       = set()   # byte‑address granularity

        # --- “stack” kept separate for clarity
        self.stack: Dict[int, int] = {self.rsp: base + 0x0007}

        # --- instrumentation
        self.stub_id     = stub_id
        self.ret_targets: List[int] = []
        self.gui         = gui

    # ============================================================
    #  Helpers
    # ============================================================
    def push64(self, v: int):
        self.rsp -= 8
        self.stack[self.rsp] = v & 0xFFFF_FFFF_FFFF_FFFF

    def pop64(self) -> int:
        if self.rsp not in self.stack:
            self._fault(CPUFault.SEG, "stack under‑flow")
        v = self.stack.pop(self.rsp)
        self.rsp += 8
        return v

    def _fault(self, kind: CPUFault, msg: str) -> "NoReturn":
        text = f"{kind.name} fault @ {hex(self.ip)} — {msg}"
        if self.gui:
            tkmsg.showerror("CPU fault", text)
        raise RuntimeError(text)

    # ------------------------------------------------------------
    #  Blob fetch helpers
    # ------------------------------------------------------------
    def _byte(self) -> int:
        off = self.ip - self.BASE
        if off < 0 or off >= len(self.blob):
            self._fault(CPUFault.UD, "fetch past blob")
        val = self.blob[off]
        self.ip += 1
        return val

    def _imm16(self) -> int: return self._byte() | (self._byte() << 8)
    def _imm32(self) -> int: return sum(self._byte() << (8 * i)
                                        for i in range(4))

    # ------------------------------------------------------------
    #  TAINT helpers
    # ------------------------------------------------------------
    def mark_tainted(self, addr: int, length: int):
        self.taint.update(range(addr, addr + length))

    def _taint_load(self, addr: int, n: int) -> bool:
        return any(a in self.taint for a in range(addr, addr + n))

    def _taint_store(self, addr: int, n: int, tainted: bool):
        if tainted:
            self.taint.update(range(addr, addr + n))

    # ------------------------------------------------------------
    #  One instruction
    # ------------------------------------------------------------
    def step(self):
        op = self._byte()

        if 0x40 <= op <= 0x4F:          # REX prefix (we only need 0x48)
            op = self._byte()

        if op == 0x54:                  # push rsp
            self.push64(self.rsp + 8)

        elif op == 0x57:                # push rdi
            self.push64(self.rdi)

        elif op == 0x5A:                # pop rdx
            self.rdx = self.pop64()

        elif op == 0xC2:                # ret imm16
            off    = self._imm16()
            ret_ip = self.pop64()
            self.ret_targets.append(ret_ip)
            self._dump_return_target(ret_ip)
            self.rsp += off
            self.ip  = ret_ip

        elif op == 0xE8:                # call rel32
            rel = self._imm32()
            self.push64(self.ip)
            self.ip += rel if rel < 0x8000_0000 else rel - 0x1_0000_0000

        elif op == 0x51:                # push rcx
            self.push64(self.rcx)

        elif op == 0x90:                # nop
            pass

        elif op == 0xD3 and self._byte() == 0x20:     # shl [rax],cl
            word  = self.memory.get(self.rax, 0) & 0xFFFF_FFFF
            taint = self._taint_load(self.rax, 4)
            word  = (word << (self.rcx & 0x1F)) & 0xFFFF_FFFF
            self.memory[self.rax] = word
            self._taint_store(self.rax, 4, taint)

        elif op == 0x34:                # xor al, imm8
            imm8  = self._byte()
            taint = -1 in self.taint          # sentinel for RAX‑byte
            self.rax = (self.rax & ~0xFF) | ((self.rax & 0xFF) ^ imm8)
            if taint:
                self.taint.add(-1)

        elif op in (0xEF, 0xFA):        # OUT DX,EAX   or   CLI
            insn = "OUT DX,EAX" if op == 0xEF else "CLI"
            if -1 in self.taint:
                print("⚠️  TAINT: user‑data reached RAX before", insn)
            self._fault(CPUFault.GP, insn)

        elif op == 0xB1:                # mov cl, imm8
            self.rcx = (self.rcx & ~0xFF) | self._byte()

        elif op == 0xF3 and self._byte() == 0x7E:
            _ = self._byte()
            self._fault(CPUFault.UD, "REP JLE illegal combo")

        elif op == 0xDA and self._byte() == 0x05:
            _ = self._imm32()
            self._fault(CPUFault.UD, "FIMUL dword ptr")

        else:
            self._fault(CPUFault.UD, f"unknown opcode {hex(op)}")

    # ------------------------------------------------------------
    #  Dump any RET target outside the original blob
    # ------------------------------------------------------------
    def _dump_return_target(self, addr: int, span: int = 64):
        if self.BASE <= addr < self.BASE + len(self.blob):
            return
        name = f"stub‑dump‑{self.stub_id}-{len(self.ret_targets)}.bin"
        with open(name, "wb") as fp:
            fp.write(self._peek(addr, span))
        print(f"📦 dumped {span} bytes at {hex(addr)} → {name}")

    def _peek(self, addr: int, n: int) -> bytes:
        return bytes(self.memory.get(addr + i, 0) for i in range(n))

    # ------------------------------------------------------------
    #  Run for at most `max_steps` instructions
    # ------------------------------------------------------------
    def run(self, max_steps: int = 500):
        try:
            for _ in range(max_steps):
                self.step()
        except RuntimeError as err:
            print("⛔", err)
        finally:
            print(f"🛑 RIP={hex(self.ip)} RSP={hex(self.rsp)} "
                  f"RDX={hex(self.rdx)} RCX={hex(self.rcx)}")
            print("    return‑targets:", [hex(t) for t in self.ret_targets])


# ----------------------------------------------------------------
#  ROP‑chain verifier
# ----------------------------------------------------------------
FORBIDDEN_REGS = {"rsp", "rip"}

def verify_rop_chain(json_path: pathlib.Path):
    chain = json.loads(json_path.read_text())
    print(f"➤ Verifying ROP chain from {json_path} ({len(chain)} gadgets)")
    for idx, entry in enumerate(chain, 1):
        blob = bytes.fromhex(entry["bytes"])
        cpu  = CPUState(blob, stub_id=f"g{idx}")
        try:
            cpu.run()
        except RuntimeError:
            print(f"❌ Gadget {idx} raised CPUFault")
            continue
        clob = [r for r in entry.get("clobbers", []) if r in FORBIDDEN_REGS]
        if clob:
            print(f"⚠️  Gadget {idx} clobbers forbidden regs:", clob)
        else:
            print(f"✅ Gadget {idx} passed")


# ----------------------------------------------------------------
#  CLI driver (Jupyter‑safe)
# ----------------------------------------------------------------
def _build_arg_parser() -> argparse.ArgumentParser:
    ap = argparse.ArgumentParser(
        description="Tiny x86‑64 stub emulator / ROP verifier / taint lab")

    # swallow Jupyter/Colab’s “‑f connection.json” flag
    ap.add_argument('-f', help=argparse.SUPPRESS)

    ap.add_argument("blob", nargs="?",
                    help="hex‑file path / text file with hex, or literal hex")
    ap.add_argument("--rop-json", type=pathlib.Path,
                    help="verify ROP chain described by JSON file")
    ap.add_argument("--gui", action="store_true",
                    help="show GUI pop‑ups (tkinter) on faults")
    ap.add_argument("--taint", metavar="ADDR:LEN",
                    help="pre‑taint bytes at ADDR (hex) for LEN bytes")
    return ap


def cli_main(argv: List[str] | None = None):
    ap = _build_arg_parser()
    args, _unknown = ap.parse_known_args(argv)

    # ---- ROP check mode ---------------------------------------
    if args.rop_json:
        verify_rop_chain(args.rop_json)
        return

    # ---- Normal stub mode -------------------------------------
    if not args.blob:
        # In notebooks we don’t want SystemExit(2); just show help.
        ap.print_usage(sys.stderr)
        print("error: need BLOB argument or --rop-json", file=sys.stderr)
        return

    # load blob bytes (file or literal hex)
    if args.blob.endswith((".hex", ".txt")):
        blob_bytes = bytes.fromhex(pathlib.Path(args.blob)
                                   .read_text().strip())
    else:
        blob_bytes = bytes.fromhex(args.blob.strip())

    cpu = CPUState(blob_bytes, gui=args.gui)
    if args.taint:
        addr, length = (int(x, 16) for x in args.taint.split(":"))
        cpu.mark_tainted(addr, length)
    cpu.run()


# ----------------------------------------------------------------
#  Only auto‑run CLI if *really* invoked as a script
#  (importing or pasting in a notebook won’t trigger it)
# ----------------------------------------------------------------
if __name__ == "__main__":
    # Detect IPython / Jupyter: they define get_ipython()
    if "get_ipython" in globals():
        # running inside a notebook cell – skip auto‑CLI
        pass
    else:
        cli_main()


In [None]:
!python mini_x86_example.py     # launches a separate process


python3: can't open file '/content/mini_x86_example.py': [Errno 2] No such file or directory



You now have **two easy ways to make it run inside the notebook**:

---

### 1 · Run it as an external program

```python
# add a leading bang so Colab spawns a real subprocess
!python mini_x86_emulator.py 54c20800        # tiny stub: push rsp; ret 8
```

*Replace `54c20800` with any hex blob, or add `--rop-json my_chain.json`.*

Because this launches a separate Python process, the hidden `‑f` flag
isn’t injected and everything works exactly like a normal shell.

---

### 2 · Call the CLI from Python

```python
import mini_x86_emulator as emu

# example: one‑byte NOP followed by RET 0
emu.cli_main(["90c3", "--taint", "2000:4"])
```

`cli_main([...])` takes a list just like `sys.argv[1:]`, so you can pass
whatever flags/blobs you like (or build them programmatically).

---

#### Why the guard exists

```python
if "get_ipython" in globals():
    pass   # skip auto‑CLI
```

`%run mini_x86_emulator.py` executes the file *inside the notebook’s own
Python process*; `get_ipython` is therefore present.  We skip the CLI to
prevent accidental `SystemExit` when you’re just importing/pasting the
module.  If you’d prefer the old behaviour, comment‑out those two lines.

---

##### Tiny self‑test blob (copy‑paste)

```
54 57 5A C2 08 00
```

*Hex*                 *Disassembly*
`54`                  `push rsp`
`57`                  `push rdi`
`5A`                  `pop  rdx`
`C2 08 00`            `ret 8`

Run it:

```python
!python mini_x86_emulator.py 54575ac20800
```

and you should see the emulator step a few instructions, dump the final
registers, and exit.

Once you pass a real blob or ROP‑JSON you’ll get the full instrumented
output.


In [None]:
%%writefile mini_x86_emulator.py
#!/usr/bin/env python3
# ================================================================
#  mini_x86_emulator.py — stub‑runner, ROP verifier, taint lab
#  (v3 – Jupyter‑friendly)
# ================================================================

from __future__ import annotations
import json, argparse, pathlib, sys, tkinter.messagebox as tkmsg
from enum import Enum, auto
from typing import Dict, List, Set


# ----------------------------------------------------------------
#  CPUFault enum
# ----------------------------------------------------------------
class CPUFault(Enum):
    GP   = auto()
    UD   = auto()
    SEG  = auto()


# ----------------------------------------------------------------
#  CPU state with taint + instrumentation
# ----------------------------------------------------------------
class CPUState:
    def __init__(self, blob: bytes, base: int = 0x1000,
                 gui: bool = False, stub_id: str = "blob"):
        self.blob, self.BASE = blob, base
        self.ip              = base

        self.rsp  = base - 1
        self.rdi  = 0xDEADBEEFCAFEBABE
        self.rdx  = self.rcx = 0
        self.rax  = 0x2000

        self.memory: Dict[int, int] = {0x2000: 0x1234_5678}
        self.taint:  Set[int]       = set()
        self.stack: Dict[int, int]  = {self.rsp: base + 0x0007}

        self.stub_id     = stub_id
        self.ret_targets: List[int] = []
        self.gui         = gui

    # ===== helpers =================================================
    def push64(self, v: int):
        self.rsp -= 8
        self.stack[self.rsp] = v & 0xFFFFFFFFFFFFFFFF

    def pop64(self) -> int:
        if self.rsp not in self.stack:
            self._fault(CPUFault.SEG, "stack under‑flow")
        v = self.stack.pop(self.rsp)
        self.rsp += 8
        return v

    def _fault(self, kind: CPUFault, msg: str):
        txt = f"{kind.name} fault @ {hex(self.ip)} — {msg}"
        if self.gui:
            tkmsg.showerror("CPU fault", txt)
        raise RuntimeError(txt)

    # ----- fetch ---------------------------------------------------
    def _byte(self) -> int:
        off = self.ip - self.BASE
        if off < 0 or off >= len(self.blob):
            self._fault(CPUFault.UD, "fetch past blob")
        b = self.blob[off]
        self.ip += 1
        return b

    def _imm16(self): return self._byte() | (self._byte() << 8)
    def _imm32(self): return sum(self._byte() << (8*i) for i in range(4))

    # ----- taint helpers ------------------------------------------
    def mark_tainted(self, addr: int, length: int):
        self.taint.update(range(addr, addr + length))

    def _taint_load(self, addr: int, n: int) -> bool:
        return any(a in self.taint for a in range(addr, addr + n))

    def _taint_store(self, addr: int, n: int, tainted: bool):
        if tainted:
            self.taint.update(range(addr, addr + n))

    # ----- single step --------------------------------------------
    def step(self):
        op = self._byte()
        if 0x40 <= op <= 0x4F:          # REX (we only need 0x48)
            op = self._byte()

        if op == 0x54:      self.push64(self.rsp + 8)              # push rsp
        elif op == 0x57:    self.push64(self.rdi)                  # push rdi
        elif op == 0x5A:    self.rdx = self.pop64()                # pop  rdx
        elif op == 0x51:    self.push64(self.rcx)                  # push rcx
        elif op == 0x90:    pass                                   # nop
        elif op == 0xB1:    self.rcx = (self.rcx & ~0xFF) | self._byte()
        elif op == 0xC2:                                          # ret imm16
            off, ret = self._imm16(), self.pop64()
            self.ret_targets.append(ret)
            self._dump_return_target(ret)
            self.rsp += off
            self.ip  = ret
        elif op == 0xE8:                                          # call rel32
            rel = self._imm32()
            self.push64(self.ip)
            self.ip += rel if rel < 0x80000000 else rel - 0x100000000
        elif op == 0xD3 and self._byte() == 0x20:                  # shl [rax],cl
            word  = self.memory.get(self.rax, 0) & 0xFFFFFFFF
            taint = self._taint_load(self.rax, 4)
            word  = (word << (self.rcx & 0x1F)) & 0xFFFFFFFF
            self.memory[self.rax] = word
            self._taint_store(self.rax, 4, taint)
        elif op == 0x34:                                          # xor al,imm8
            imm8, taint = self._byte(), (-1 in self.taint)
            self.rax = (self.rax & ~0xFF) | ((self.rax & 0xFF) ^ imm8)
            if taint: self.taint.add(-1)
        elif op in (0xEF, 0xFA):                                  # OUT/CLI
            name = "OUT DX,EAX" if op == 0xEF else "CLI"
            if -1 in self.taint:
                print("⚠️  TAINT reached RAX before", name)
            self._fault(CPUFault.GP, name)
        elif op == 0xF3 and self._byte() == 0x7E:                 # REP JLE
            _ = self._byte()
            self._fault(CPUFault.UD, "REP JLE illegal combo")
        elif op == 0xDA and self._byte() == 0x05:                 # FIMUL
            _ = self._imm32()
            self._fault(CPUFault.UD, "FIMUL dword ptr")
        else:
            self._fault(CPUFault.UD, f"unknown opcode {hex(op)}")

    # ----- helpers -------------------------------------------------
    def _dump_return_target(self, addr: int, span: int = 64):
        if self.BASE <= addr < self.BASE + len(self.blob):
            return
        fname = f"stub‑dump‑{self.stub_id}-{len(self.ret_targets)}.bin"
        with open(fname, "wb") as fp:
            fp.write(self._peek(addr, span))
        print(f"📦 dumped {span} bytes at {hex(addr)} → {fname}")

    def _peek(self, addr: int, n: int) -> bytes:
        return bytes(self.memory.get(addr + i, 0) for i in range(n))

    # ----- run -----------------------------------------------------
    def run(self, max_steps=500):
        try:
            for _ in range(max_steps):
                self.step()
        except RuntimeError as e:
            print("⛔", e)
        finally:
            print(f"🛑 RIP={hex(self.ip)} RSP={hex(self.rsp)} "
                  f"RDX={hex(self.rdx)} RCX={hex(self.rcx)}")
            print("    return‑targets:", [hex(t) for t in self.ret_targets])


# ----------------------------------------------------------------
#  ROP verifier
# ----------------------------------------------------------------
FORBIDDEN_REGS = {"rsp", "rip"}

def verify_rop_chain(json_path: pathlib.Path):
    chain = json.loads(json_path.read_text())
    print(f"➤ Verifying ROP chain from {json_path} ({len(chain)} gadgets)")
    for i, g in enumerate(chain, 1):
        cpu = CPUState(bytes.fromhex(g["bytes"]), stub_id=f"g{i}")
        try: cpu.run()
        except RuntimeError: print(f"❌ Gadget {i} raised CPUFault"); continue
        bad = [r for r in g.get("clobbers", []) if r in FORBIDDEN_REGS]
        print("⚠️ clobbers forbidden regs:" if bad else "✅ passed", bad or "")


# ----------------------------------------------------------------
#  CLI (Jupyter‑safe)
# ----------------------------------------------------------------
def _arg_parser() -> argparse.ArgumentParser:
    ap = argparse.ArgumentParser(
        description="Tiny x86‑64 stub emulator / ROP verifier / taint lab")
    ap.add_argument('-f', help=argparse.SUPPRESS)       # swallow notebook
    ap.add_argument("blob", nargs="?",
                    help="hex file / literal hex")
    ap.add_argument("--rop-json", type=pathlib.Path,
                    help="verify ROP chain JSON file")
    ap.add_argument("--gui", action="store_true", help="tkinter pop‑ups")
    ap.add_argument("--taint", metavar="ADDR:LEN",
                    help="mark bytes tainted (hex)")
    return ap

def cli_main(argv=None):
    ap = _arg_parser()
    args, _ = ap.parse_known_args(argv)

    if args.rop_json:
        verify_rop_chain(args.rop_json); return

    if not args.blob:
        ap.print_usage(sys.stderr)
        print("error: need BLOB argument or --rop-json", file=sys.stderr)
        return

    blob = (bytes.fromhex(Path(args.blob).read_text().strip())
            if args.blob.endswith((".hex", ".txt"))
            else bytes.fromhex(args.blob.strip()))

    cpu = CPUState(blob, gui=args.gui)
    if args.taint:
        a, l = (int(x, 16) for x in args.taint.split(":"))
        cpu.mark_tainted(a, l)
    cpu.run()

if __name__ == "__main__":
    if "get_ipython" not in globals():
        cli_main()


Writing mini_x86_emulator.py


In [None]:
%%writefile mini_x86_example.py
#!/usr/bin/env python3
# ================================================================
#  mini_x86_example.py — three quick demos
# ================================================================

import json
from pathlib import Path
import mini_x86_emulator as emu


# 1 · CLI‑style inside this process
def demo_cli():
    print("\n=== demo_cli =============================================")
    emu.cli_main(["54575ac20800"])            # push rsp;push rdi;pop rdx;ret 8


# 2 · Direct API
def demo_api():
    print("\n=== demo_api =============================================")
    cpu = emu.CPUState(bytes.fromhex("54575ac20800"), stub_id="api")
    cpu.run(max_steps=10)


# 3 · ROP‑chain verifier
def demo_rop():
    print("\n=== demo_rop =============================================")
    chain = [
        {"bytes": "90c3",           "clobbers": []},          # NOP;RET
        {"bytes": "54575ac20800",   "clobbers": ["rdx"]}      # the stub
    ]
    f = Path("rop_demo.json"); f.write_text(json.dumps(chain, indent=2))
    try: emu.cli_main(["--rop-json", str(f)])
    finally: f.unlink(missing_ok=True)


if __name__ == "__main__":
    demo_cli()
    demo_api()
    demo_rop()


Writing mini_x86_example.py


In [None]:
# run in a separate process
!python mini_x86_example.py



📦 dumped 64 bytes at 0x1007 → stub‑dump‑blob-1.bin
⛔ UD fault @ 0x1007 — fetch past blob
🛑 RIP=0x1007 RSP=0x1007 RDX=0xdeadbeefcafebabe RCX=0x0
    return‑targets: ['0x1007']

📦 dumped 64 bytes at 0x1007 → stub‑dump‑api-1.bin
⛔ UD fault @ 0x1007 — fetch past blob
🛑 RIP=0x1007 RSP=0x1007 RDX=0xdeadbeefcafebabe RCX=0x0
    return‑targets: ['0x1007']

➤ Verifying ROP chain from rop_demo.json (2 gadgets)
⛔ UD fault @ 0x1002 — unknown opcode 0xc3
🛑 RIP=0x1002 RSP=0xfff RDX=0x0 RCX=0x0
    return‑targets: []
✅ passed 
📦 dumped 64 bytes at 0x1007 → stub‑dump‑g2-1.bin
⛔ UD fault @ 0x1007 — fetch past blob
🛑 RIP=0x1007 RSP=0x1007 RDX=0xdeadbeefcafebabe RCX=0x0
    return‑targets: ['0x1007']
✅ passed 


In [None]:
# or import & run in‑kernel
import mini_x86_example


In [None]:
#!/usr/bin/env python3
# ================================================================
#  example_app.py — quick tour of mini_x86_emulator
# ================================================================

import json
from pathlib import Path
import mini_x86_emulator as emu


# ----------------------------------------------------------------
# 1 · Run a tiny stub end‑to‑end
# ----------------------------------------------------------------
def run_stub():
    print("\n=== 1 · Single‑stub run ==================================")
    #
    # 54            push rsp
    # 57            push rdi
    # 5A            pop  rdx
    # 34 AA         xor  al,0xAA
    # C2 08 00      ret  8
    #
    stub_hex = "54575a34aac20800"
    emu.cli_main([stub_hex])


# ----------------------------------------------------------------
# 2 · Demonstrate taint propagation
# ----------------------------------------------------------------
def taint_demo():
    print("\n=== 2 · Taint‑tracking demo ==============================")
    stub_hex = "54575a34aac20800"
    # taint bytes 0x2000–0x2003 (the dword the stub shifts)
    emu.cli_main([stub_hex, "--taint", "2000:4"])


# ----------------------------------------------------------------
# 3 · Verify a two‑gadget ROP chain
# ----------------------------------------------------------------
def verify_rop():
    print("\n=== 3 · ROP‑chain verifier ===============================")
    chain = [
        {  # Gadget 1: NOP ; RET
          "bytes": "90c3",
          "clobbers": []
        },
        {  # Gadget 2: our tiny stub above
          "bytes": "54575a34aac20800",
          "clobbers": ["rdx"]          # we know it touches RDX
        }
    ]

    json_file = Path("rop_chain_demo.json")
    json_file.write_text(json.dumps(chain, indent=2))

    try:
        emu.cli_main(["--rop-json", str(json_file)])
    finally:
        json_file.unlink(missing_ok=True)   # tidy up


# ----------------------------------------------------------------
if __name__ == "__main__":
    run_stub()
    taint_demo()
    verify_rop()



⛔ UD fault @ 0x1008 — unknown opcode 0x0
🛑 RIP=0x1008 RSP=0x1007 RDX=0xdeadbeefcafebabe RCX=0x0
    return‑targets: ['0x1007']

⛔ UD fault @ 0x1008 — unknown opcode 0x0
🛑 RIP=0x1008 RSP=0x1007 RDX=0xdeadbeefcafebabe RCX=0x0
    return‑targets: ['0x1007']

➤ Verifying ROP chain from rop_chain_demo.json (2 gadgets)
⛔ UD fault @ 0x1002 — unknown opcode 0xc3
🛑 RIP=0x1002 RSP=0xfff RDX=0x0 RCX=0x0
    return‑targets: []
✅ passed 
⛔ UD fault @ 0x1008 — unknown opcode 0x0
🛑 RIP=0x1008 RSP=0x1007 RDX=0xdeadbeefcafebabe RCX=0x0
    return‑targets: ['0x1007']
✅ passed 


In [None]:
#!/usr/bin/env python3
# ================================================================
#  symbolic_addons.py — symbolic / concolic / JIT extensions
# ================================================================

# ---------- auto-install missing deps ---------------------------------------
def _ensure(pkg, pypi=None):
    try:
        return __import__(pkg)
    except ImportError:
        import subprocess, sys, importlib
        subprocess.check_call(
            [sys.executable, "-m", "pip", "install", "--quiet", pypi or pkg]
        )
        importlib.invalidate_caches()
        return __import__(pkg)

z3        = _ensure("z3", "z3-solver")
llvmlite  = _ensure("llvmlite")

from z3 import BitVec, BitVecVal, Solver, sat, Concat, Extract
from ctypes import CFUNCTYPE
from llvmlite import ir, binding
import mini_x86_emulator as base


# ---------- helpers ----------------------------------------------------------
def _to_bv(v, bits=64):
    return v if hasattr(v, "sort") else BitVecVal(v & ((1 << bits) - 1), bits)


# -----------------------------------------------------------------------------
#  1 · Symbolic CPU
# -----------------------------------------------------------------------------
class SymbolicCPU(base.CPUState):
    def __init__(self, blob: bytes, base_addr: int = 0x1000):
        super().__init__(blob, base=base_addr)
        self.solver  = Solver()
        self.symbols = {}
        self.sym_mem = {}

    # ---- memory helpers -----------------------------------------------------
    def _read_mem(self, addr, size=8):
        return self.sym_mem.get(addr, self.memory.get(addr, 0))

    def _write_mem(self, addr, val, size=8):
        self.sym_mem[addr] = _to_bv(val, 8 * size)

    # ---- fresh symbol -------------------------------------------------------
    def _fresh(self, tag="tmp", bits=64):
        name = f"{tag}_{len(self.symbols)}"
        bv   = BitVec(name, bits)
        self.symbols[name] = bv
        return bv

    # ---- override step ------------------------------------------------------
    def step(self):
        op = self._byte()

        # MOV EAX, imm32   (0xB8 + imm32)
        if op == 0xB8:
            imm32 = self._imm32()
            hi    = Extract(63, 32, _to_bv(self.rax))    # old high dword
            lo    = BitVecVal(imm32, 32)                 # new low  dword
            self.rax = Concat(hi, lo)
            return

        # CMP DWORD PTR [rip+disp32], eax   (pattern 39 05 xx xx xx xx)
        if op == 0x39 and self._byte() == 0x05:
            disp  = self._imm32()
            addr  = self.ip + disp
            mem_v = _to_bv(self._read_mem(addr, 4), 32)
            eax   = _to_bv(self.rax & 0xFFFFFFFF, 32)
            self.solver.add(mem_v == eax)
            return

        super().step()

    # ---- path exploration ---------------------------------------------------
    def explore_paths(self, max_paths=10, max_steps=200):
        stack = [(self.clone(), 0)]
        paths = []

        while stack and len(paths) < max_paths:
            cpu, steps = stack.pop()
            while steps < max_steps:
                try:
                    cpu.step(); steps += 1
                except RuntimeError:
                    model = cpu.solver.model()
                    paths.append({
                        "constraints": list(cpu.solver.assertions()),
                        "model": {n: model.eval(v) for n, v in cpu.symbols.items()}
                    })
                    break

            for c in cpu.solver.assertions():
                if c.num_args() == 2 and c.arg(0) != c.arg(1):
                    alt = cpu.clone()
                    alt.solver.add(c.arg(0) != c.arg(1))
                    if alt.solver.check() == sat:
                        stack.append((alt, 0))
                    break
        return paths

    def clone(self):
        dup = SymbolicCPU(self.blob, self.BASE)
        for k, v in self.__dict__.items():
            if k not in ("solver", "symbols", "sym_mem"):
                dup.__dict__[k] = v
        dup.symbols = dict(self.symbols)
        dup.sym_mem = dict(self.sym_mem)
        dup.solver  = Solver(); dup.solver.append(*self.solver.assertions())
        return dup


# -----------------------------------------------------------------------------
#  2 · Concolic CPU
# -----------------------------------------------------------------------------
class ConcolicCPU(SymbolicCPU):
    def __init__(self, blob: bytes, concrete_inputs: dict[int, int]):
        super().__init__(blob)
        self.c_inputs = concrete_inputs

    def _read_mem(self, addr, size=8):
        if addr in self.c_inputs:
            conc = self.c_inputs[addr]
            sym  = self.sym_mem.get(addr, self._fresh(f"in_{addr:x}", 8*size))
            self.solver.add(sym == conc)
            return conc
        return super()._read_mem(addr, size)

    VULN_SITES = {0xDEADFEED, 0xCAFEBABE}

    def generate_poc(self):
        if self.solver.check() == sat:
            model = self.solver.model()
            print("=== PoC ===")
            for n, v in self.symbols.items():
                print(f"{n} = {model.eval(v)}")

    def detect_vulns(self, steps=500):
        for _ in range(steps):
            try:
                self.step()
                if self.ip in self.VULN_SITES:
                    print(f"Reached vuln site {hex(self.ip)}")
                    self.generate_poc(); break
            except RuntimeError as e:
                if "write" in str(e).lower():
                    print("Write violation:", e)
                    self.generate_poc(); break


# -----------------------------------------------------------------------------
#  3 · Exploit helper & equivalence
# -----------------------------------------------------------------------------
def generate_exploit(cpu: SymbolicCPU, target_ip: int):
    cpu.solver.add(cpu.ip == target_ip)
    if cpu.solver.check() == sat:
        m = cpu.solver.model()
        return {n: m.eval(v) for n, v in cpu.symbols.items()}
    return None


def equivalent(cpu1: base.CPUState, cpu2: base.CPUState, steps=100):
    for _ in range(steps):
        try:
            cpu1.step(); cpu2.step()
        except RuntimeError:
            return False
        s = Solver(); s.add(_to_bv(cpu1.rax) != _to_bv(cpu2.rax))
        if s.check() == sat: return False
    return True


# -----------------------------------------------------------------------------
#  4 · Mini JIT via llvmlite
# -----------------------------------------------------------------------------
class JITCompiler:
    def __init__(self):
        binding.initialize(); binding.initialize_native_target(); binding.initialize_native_asmprinter()
        self.mod = ir.Module(name="emu_jit")
        fn_ty    = ir.FunctionType(ir.VoidType(), [])
        self.fn  = ir.Function(self.mod, fn_ty, name="execute")
        self.bld = ir.IRBuilder(self.fn.append_basic_block())

    def compile_instruction(self, opcode, *args):
        if opcode != 0x01: raise NotImplementedError("only ADD")
        a0, a1 = (ir.Constant(ir.IntType(64), x) for x in args)
        self.bld.add(a0, a1)

    def finalize_and_run(self):
        self.bld.ret_void()
        mod = binding.parse_assembly(str(self.mod)); mod.verify()
        tgt = binding.Target.from_default_triple().create_target_machine()
        with binding.create_mcjit_compiler(mod, tgt) as eng:
            eng.finalize_object()
            CFUNCTYPE(None)(eng.get_function_address("execute"))()


# -----------------------------------------------------------------------------
#  5 · Smoke-test
# -----------------------------------------------------------------------------
if __name__ == "__main__":
    print("=== Smoke test ===")
    stub = bytes.fromhex("B841000000C20000")  # MOV EAX,0x41 ; RET
    sym  = SymbolicCPU(stub); sym.run()

    con  = ConcolicCPU(stub, {0x2000: 0x41414141}); con.detect_vulns()
    print("Equivalence:", equivalent(base.CPUState(stub), base.CPUState(stub), 3))

    jit = JITCompiler(); jit.compile_instruction(0x01, 2, 3); jit.finalize_and_run()


=== Smoke test ===
⛔ UD fault @ 0x1007 — unknown opcode 0x0
🛑 RIP=0x1007 RSP=0xfff RDX=0x0 RCX=0x0
    return‑targets: []
Equivalence: False


In [None]:
%%writefile symbolic_addons.py
# (paste the entire block above)

import symbolic_addons as sym

sc = bytes.fromhex("B841000000C20000")   # MOV EAX,0x41 ; RET
cpu = sym.SymbolicCPU(sc)
cpu.run()
print(cpu.symbols)


Writing symbolic_addons.py
