Skip to content

Commit

Permalink
Initial work for flexible tracing support
Browse files Browse the repository at this point in the history
  • Loading branch information
mrexodia committed Apr 9, 2023
1 parent 3e29204 commit bbbc3ca
Showing 1 changed file with 165 additions and 93 deletions.
258 changes: 165 additions & 93 deletions src/dumpulator/dumpulator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import ctypes
from io import TextIOBase
import struct
import sys
import traceback
Expand Down Expand Up @@ -267,6 +268,130 @@ def __call__(self, name: str):
diff = self.time - prev
print(f"{name}: {diff*1000:.0f}ms")

class AbstractTrace:
def __init__(self, dp: "Dumpulator"):
self.dp = dp

# TODO: other events?

def step(self, address: int, size: int):
raise NotImplementedError()

def start(self):
self.dp.set_tracing(True)

def stop(self):
self.dp.set_tracing(False)

def close(self):
pass

def flush(self):
pass

class TextTrace(AbstractTrace):
def __init__(self, dp: "Dumpulator", filename: str):
super().__init__(dp)
self.__filename = filename
self.__fp: Optional[TextIOBase] = None
# TODO: multiple cs instances per segment
self.__cs32 = Cs(CS_ARCH_X86, CS_MODE_32)
self.__cs32.detail = True
self.__cs64 = Cs(CS_ARCH_X86, CS_MODE_64)
self.__cs64.detail = True

@staticmethod
def _get_regs(instr, include_write=False):
regs = OrderedDict()
operands = instr.operands
if instr.id != X86_INS_NOP:
for i in range(0, len(operands)):
op = operands[i]
if op.type == CS_OP_REG:
is_write_op = (i == 0 and instr.id in [X86_INS_MOV, X86_INS_MOVZX, X86_INS_LEA])
if not is_write_op and not include_write:
regs[instr.reg_name(op.value.reg)] = None
elif op.type == CS_OP_MEM:
if op.value.mem.base not in [0, X86_REG_RIP]:
regs[instr.reg_name(op.value.mem.base)] = None
if op.value.mem.index not in [0, X86_REG_RIP]:
regs[instr.reg_name(op.value.mem.index)] = None
for reg in instr.regs_read:
regs[instr.reg_name(reg)] = None
if include_write:
for reg in instr.regs_write:
regs[instr.reg_name(reg)] = None
return regs

def step(self, address: int, size: int):
dp = self.dp
code = b""
try:
code = dp.read(address, min(size, 15))
cs = self.__cs64 if dp.regs.cs == windows_user_segment.cs else self.__cs32
instr = next(cs.disasm(code, address, 1))
except StopIteration:
instr = None # Unsupported instruction
except IndexError:
instr = None # Likely invalid memory

fp = self.__fp
fp.write(hex(address))

address_name = dp.exports.get(address, "")
module = ""
if dp.last_module and address in dp.last_module:
# same module again
pass
else:
# new module
dp.last_module = dp.modules.find(address)
if dp.last_module:
module = dp.last_module.name

if address_name:
fp.write(" ")
fp.write(address_name)
elif module:
fp.write(" ")
fp.write(module)
fp.write("|")

if instr is not None:
fp.write(instr.mnemonic)
if instr.op_str:
fp.write(" ")
fp.write(instr.op_str)
for reg in TextTrace._get_regs(instr):
fp.write(f"|{reg}={hex(dp.regs.__getattr__(reg))}")
if instr.mnemonic == "call":
# print return address
ret_address = address + instr.size
fp.write(f"|return_address={hex(ret_address)}")
elif instr.mnemonic in {"syscall", "sysenter"}:
fp.write(f"|sequence_id=[{dp.sequence_id}]")
else:
fp.write(f"??? (code: {code.hex()}, size: {hex(size)})")
fp.write("\n")

def start(self):
if self.__fp is None:
self.__fp = open(self.__filename, "w")
super().start()

def stop(self):
self.flush()
super().stop()

def close(self):
if self.__fp is not None:
self.__fp.close()
self.__fp = None

def flush(self):
if self.__fp is not None:
self.__fp.flush()

class Dumpulator(Architecture):
def __init__(self, minidump_file, *, trace=False, quiet=False, thread_id=None, debug_logs=False):
self._quiet = quiet
Expand All @@ -289,20 +414,10 @@ def __init__(self, minidump_file, *, trace=False, quiet=False, thread_id=None, d
super().__init__(type(thread.ContextObject) is not minidump.WOW64_CONTEXT)
self.addr_mask = 0xFFFFFFFFFFFFFFFF if self._x64 else 0xFFFFFFFF

if trace:
self.trace = open(minidump_file + ".trace", "w")
else:
self.trace = None

self.last_module: Optional[Module] = None

self._uc = Uc(UC_ARCH_X86, UC_MODE_64)

# TODO: multiple cs instances per segment
mode = CS_MODE_64 if self._x64 else CS_MODE_32
self.cs = Cs(CS_ARCH_X86, mode)
self.cs.detail = True

self.regs = Registers(self._uc, self._x64)
self._pages = LazyPageManager(UnicornPageManager(self._uc))
self.memory = MemoryManager(self._pages)
Expand Down Expand Up @@ -332,6 +447,31 @@ def __init__(self, minidump_file, *, trace=False, quiet=False, thread_id=None, d
print("Memory map:")
self.print_memory()

self._trace_hook: Optional[int] = None
if isinstance(trace, AbstractTrace):
self.trace = trace
else:
self.trace = TextTrace(self, minidump_file + ".trace")
if trace:
self.trace.start()

# TODO: support start/end
def set_tracing(self, enabled: bool):
if enabled:
if self._trace_hook is None:
def hook_code(uc, address, size, userdata):
try:
self.trace.step(address, min(size, 15))
except BaseException as e:
self.trace.stop()
self.stop()
raise e
self._trace_hook = self._uc.hook_add(UC_HOOK_CODE, hook_code)
else:
if self._trace_hook is not None:
self._uc.hook_del(self._trace_hook)
self._trace_hook = None

def print_memory(self):
regions = self.memory.map()
regions.pop() # remove the last free region
Expand Down Expand Up @@ -754,8 +894,6 @@ def push64(value):
self._uc.hook_add(UC_HOOK_MEM_INVALID, _hook_mem, user_data=self)
self._uc.hook_add(UC_HOOK_INTR, _hook_interrupt, user_data=self)
self._uc.hook_add(UC_HOOK_INSN_INVALID, _hook_invalid, user_data=self)
if self.trace:
self._uc.hook_add(UC_HOOK_CODE, _hook_code, user_data=self)

def _all_exports(self):
exports: Dict[int, str] = {}
Expand Down Expand Up @@ -1298,6 +1436,7 @@ def _hook_code_exception(uc: Uc, address, size, dp: Dumpulator):
dp.error(f"Exception during unicorn hook, please report this as a bug")
raise err

# TODO: figure out why when you start executing at 0 this callback is triggered more than once
def _hook_mem(uc: Uc, access, address, size, value, dp: Dumpulator):
if dp._pages.handle_lazy_page(address, min(size, PAGE_SIZE)):
dp.debug(f"committed lazy page {hex(address)}[{hex(size)}] (cip: {hex(dp.regs.cip)})")
Expand All @@ -1310,7 +1449,11 @@ def _hook_mem(uc: Uc, access, address, size, value, dp: Dumpulator):
if dp._exception.final and access in fetch_accesses:
dp.info(f"fetch from {hex(address)}[{size}] already reported")
return False
# TODO: figure out why when you start executing at 0 this callback is triggered more than once

# TODO: modify this for start/end support
# We would have to be certain that we were tracing on the previous(?) instruction
is_tracing_range = dp._trace_hook is not None

try:
violation = {
UC_MEM_READ_UNMAPPED: MemoryViolation.ReadUnmapped,
Expand All @@ -1336,7 +1479,7 @@ def _hook_mem(uc: Uc, access, address, size, value, dp: Dumpulator):
exception.tb_icount = tb.icount

# Print exception info
final = dp.trace or dp._exception.code_hook_h is not None
final = is_tracing_range or dp._exception.code_hook_h is not None
info = "final" if final else "initial"
if access == UC_MEM_READ_UNMAPPED:
dp.error(f"{info} unmapped read from {hex(address)}[{hex(size)}], cip = {hex(dp.regs.cip)}, exception: {exception}")
Expand All @@ -1361,7 +1504,7 @@ def _hook_mem(uc: Uc, access, address, size, value, dp: Dumpulator):

if final:
# Make sure this is the same exception we expect
if not dp.trace:
if not is_tracing_range:
assert violation == dp._exception.memory_violation
assert address == dp._exception.memory_address
assert size == dp._exception.memory_size
Expand Down Expand Up @@ -1406,77 +1549,6 @@ def _hook_mem(uc: Uc, access, address, size, value, dp: Dumpulator):
except Exception as err:
raise err

def _get_regs(instr, include_write=False):
regs = OrderedDict()
operands = instr.operands
if instr.id != X86_INS_NOP:
for i in range(0, len(operands)):
op = operands[i]
if op.type == CS_OP_REG:
is_write_op = (i == 0 and instr.id in [X86_INS_MOV, X86_INS_MOVZX, X86_INS_LEA])
if not is_write_op and not include_write:
regs[instr.reg_name(op.value.reg)] = None
elif op.type == CS_OP_MEM:
if op.value.mem.base not in [0, X86_REG_RIP]:
regs[instr.reg_name(op.value.mem.base)] = None
if op.value.mem.index not in [0, X86_REG_RIP]:
regs[instr.reg_name(op.value.mem.index)] = None
for reg in instr.regs_read:
regs[instr.reg_name(reg)] = None
if include_write:
for reg in instr.regs_write:
regs[instr.reg_name(reg)] = None
return regs

def _hook_code(uc: Uc, address, size, dp: Dumpulator):
try:
code = b""
try:
code = dp.read(address, min(size, 15))
instr = next(dp.cs.disasm(code, address, 1))
except StopIteration:
instr = None # Unsupported instruction
except IndexError:
instr = None # Likely invalid memory
address_name = dp.exports.get(address, "")

module = ""
if dp.last_module and address in dp.last_module:
# same module again
pass
else:
# new module
dp.last_module = dp.modules.find(address)
if dp.last_module:
module = dp.last_module.name

if address_name:
address_name = " " + address_name
elif module:
address_name = " " + module

line = f"{hex(address)}{address_name}|"
if instr is not None:
line += instr.mnemonic
if instr.op_str:
line += " "
line += instr.op_str
for reg in _get_regs(instr):
line += f"|{reg}={hex(dp.regs.__getattr__(reg))}"
if instr.mnemonic == "call":
# print return address
ret_address = address + instr.size
line += f"|return_address={hex(ret_address)}"
elif instr.mnemonic in {"syscall", "sysenter"}:
line += f"|sequence_id=[{dp.sequence_id}]"
else:
line += f"??? (code: {code.hex()}, size: {hex(size)})"
line += "\n"
dp.trace.write(line)
except (KeyboardInterrupt, SystemExit) as e:
dp.stop()
raise e

def _unicode_string_to_string(dp: Dumpulator, arg: P[UNICODE_STRING]):
try:
return arg[0].read_str()
Expand Down Expand Up @@ -1525,8 +1597,7 @@ def _arg_type_string(arg):
return type(arg).__name__

def _hook_interrupt(uc: Uc, number, dp: Dumpulator):
if dp.trace:
dp.trace.flush()
dp.trace.flush()
try:
# Extract exception information
exception = UnicornExceptionInfo()
Expand Down Expand Up @@ -1566,8 +1637,7 @@ def _hook_interrupt(uc: Uc, number, dp: Dumpulator):

def _hook_syscall(uc: Uc, dp: Dumpulator):
# Flush the trace for easier debugging
if dp.trace is not None:
dp.trace.flush()
dp.trace.flush()

# Extract the table and function number from eax
service_number = dp.regs.cax & 0xffff
Expand Down Expand Up @@ -1677,16 +1747,18 @@ def _emulate_unsupported_instruction(dp: Dumpulator, instr: CsInsn):

def _hook_invalid(uc: Uc, dp: Dumpulator):
address = dp.regs.cip
if dp.trace:
dp.trace.flush()
dp.trace.flush()
# HACK: unicorn cannot gracefully exit in all contexts
if dp.stopped:
dp.error(f"terminating emulation...")
return False
dp.error(f"invalid instruction at {hex(address)}")
try:
mode = CS_MODE_64 if dp.regs.cs == windows_user_segment.cs else CS_MODE_32
cs = Cs(CS_ARCH_X86, mode)
cs.detail = True
code = dp.read(address, 15)
instr = next(dp.cs.disasm(code, address, 1))
instr = next(cs.disasm(code, address, 1))
if _emulate_unsupported_instruction(dp, instr):
# Resume execution with a context switch
assert dp._exception.type == ExceptionType.NoException
Expand Down

0 comments on commit bbbc3ca

Please sign in to comment.