Skip to content

Commit

Permalink
Add Manticore native State-specific hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
ekilmer committed Jul 17, 2020
1 parent 8a8f35c commit bb52935
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 10 deletions.
26 changes: 18 additions & 8 deletions manticore/native/manticore.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import os
import shlex
import time
from typing import Callable, Optional
import sys
from elftools.elf.elffile import ELFFile
from elftools.elf.sections import SymbolTableSection

from .state import State
from .state import HookCallback, State
from ..core.manticore import ManticoreBase
from ..core.smtlib import ConstraintSet
from ..core.smtlib.solver import SelectedSolver, issymbolic
Expand Down Expand Up @@ -229,19 +230,25 @@ def decorator(f):

return decorator

def add_hook(self, pc, callback, after=False):
def add_hook(
self,
pc: Optional[int],
callback: HookCallback,
after: bool = False,
state: Optional[State] = None,
):
"""
Add a callback to be invoked on executing a program counter. Pass `None`
for pc to invoke callback on every instruction. `callback` should be a callable
that takes one :class:`~manticore.core.state.State` argument.
that takes one :class:`~manticore.native.state.State` argument.
:param pc: Address of instruction to hook
:type pc: int or None
:param callable callback: Hook function
:param callback: Hook function
:param after: Hook after PC executes?
:param state: Optionally, add hook for this state only, else all states
"""
if not (isinstance(pc, int) or pc is None):
raise TypeError(f"pc must be either an int or None, not {pc.__class__.__name__}")
else:
if state is None:
# add hook to all states
hooks, when, hook_callback = (
(self._hooks, "will_execute_instruction", self._hook_callback)
if not after
Expand All @@ -250,6 +257,9 @@ def add_hook(self, pc, callback, after=False):
hooks.setdefault(pc, set()).add(callback)
if hooks:
self.subscribe(when, hook_callback)
else:
# only hook for the specified state
state.add_hook(pc, callback, after)

def _hook_callback(self, state, pc, instruction):
"Invoke all registered generic hooks"
Expand Down
146 changes: 144 additions & 2 deletions manticore/native/state.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import copy
from collections import namedtuple
from typing import Any, NamedTuple
from typing import Any, Callable, Dict, NamedTuple, Optional, Set, Tuple, Union

from .cpu.disasm import Instruction
from .memory import ConcretizeMemory, MemoryException
from .. import issymbolic
from ..core.state import StateBase, Concretize, TerminateState
from ..native.memory import ConcretizeMemory, MemoryException
from ..core.smtlib import Expression


HookCallback = Callable[[State], None]


class CheckpointData(NamedTuple):
Expand All @@ -11,6 +18,141 @@ class CheckpointData(NamedTuple):


class State(StateBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._hooks: Dict[Optional[int], Set[HookCallback]] = {}
self._after_hooks: Dict[Optional[int], Set[HookCallback]] = {}

def __getstate__(self):
state = super().__getstate__()
state["hooks"] = self._hooks
state["after_hooks"] = self._after_hooks
return state

def __setstate__(self, state):
super().__setstate__(state)
self._hooks = state["hooks"]
self._after_hooks = state["after_hooks"]
self._resub_hooks()

def __enter__(self):
new_state = super().__enter__()
new_state._hooks = copy.copy(self._hooks)
new_state._after_hooks = copy.copy(self._after_hooks)
return new_state

def _get_hook_context(
self, after: bool = True
) -> Tuple[Dict[Optional[int], Set[HookCallback]], str, function]:
"""
Internal helper function to get hook context information.
:param after: Whether we want info pertaining to hooks after instruction executes or before
:return: Information for hooks after or before:
- set of hooks for specified after or before
- string of callback event
- State function that handles the callback
"""
return (
(self._hooks, "will_execute_instruction", self._state_hook_callback)
if not after
else (self._after_hooks, "did_execute_instruction", self._state_after_hook_callback)
)

def remove_hook(self, pc: Optional[int], callback: HookCallback, after: bool = False) -> bool:
"""
Remove a callback with the specified properties
:param pc: Address of instruction to remove from
:param callback: The callback function that was at the address
:param after: Whether it was after instruction executed or not
:return: Whether it was removed
"""
hooks, when, hook_callback = self._get_hook_context(after)
cbs = hooks.get(pc, set())
if callback in cbs:
cbs.remove(callback)
else:
return False

if len(hooks.get(pc, set())) == 0:
del hooks[pc]

return True

def add_hook(self, pc: Optional[int], callback: HookCallback, after: bool = False) -> None:
"""
Add a callback to be invoked on executing a program counter. Pass `None`
for pc to invoke callback on every instruction. `callback` should be a callable
that takes one :class:`~manticore.native.state.State` argument.
:param pc: Address of instruction to hook
:param callback: Hook function
:param after: Hook after PC executes?
:param state: Add hook to this state
"""
hooks, when, hook_callback = self._get_hook_context(after)
hooks.setdefault(pc, set()).add(callback)
if hooks:
self.subscribe(when, hook_callback)

def _resub_hooks(self) -> None:
"""
Internal helper function to resubscribe hook callback events when the
state is active again.
"""
# TODO: check if the lists actually have hooks
_, when, hook_callback = self._get_hook_context(False)
self.subscribe(when, hook_callback)

_, when, hook_callback = self._get_hook_context(True)
self.subscribe(when, hook_callback)

def _state_hook_callback(self, pc: Union[int, Expression], _instruction: Instruction) -> None:
"""
Invoke all registered State hooks before the instruction executes.
:param pc: Address where the hook should run
:param _instruction: Instruction at this PC
"""
# Ignore symbolic pc.
# TODO(yan): Should we ask the solver if any of the hooks are possible,
# and execute those that are?
if issymbolic(pc):
return
assert isinstance(pc, int)

# Prevent crash if removing hook(s) during a callback
tmp_hooks = copy.deepcopy(self._hooks)

# Invoke all pc-specific hooks
for cb in tmp_hooks.get(pc, []):
cb(self)

# Invoke all pc-agnostic hooks
for cb in tmp_hooks.get(None, []):
cb(self)

def _state_after_hook_callback(
self, last_pc: int, _pc: Union[int, Expression], _instruction: Instruction
):
"""
Invoke all registered State hooks after the instruction executes.
:param last_pc: Address where the hook should run after instruction execution
:param _pc: Next address to execute
:param _instruction: Instruction at this last_pc
"""

# Prevent crash if removing hook(s) during a callback
tmp_hooks = copy.deepcopy(self._after_hooks)
# Invoke all pc-specific hooks
for cb in tmp_hooks.get(last_pc, []):
cb(self)

# Invoke all pc-agnostic hooks
for cb in tmp_hooks.get(None, []):
cb(self)

@property
def cpu(self):
"""
Expand Down

0 comments on commit bb52935

Please sign in to comment.