Skip to content

Commit

Permalink
Merge branch 'master' into lgtm-fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Brad Larsen committed Apr 7, 2020
2 parents 2473412 + 924a139 commit a150f02
Show file tree
Hide file tree
Showing 13 changed files with 432 additions and 245 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v2
- name: Set up Python 3.6
uses: actions/setup-python@v1
with:
Expand Down
2 changes: 1 addition & 1 deletion manticore/core/manticore.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ def generate_testcase(self, state, message="test", name="test"):
return testcase

@at_not_running
def register_plugin(self, plugin):
def register_plugin(self, plugin: Plugin):
# Global enumeration of valid events
assert isinstance(plugin, Plugin)
assert plugin not in self.plugins, "Plugin instance already registered"
Expand Down
10 changes: 8 additions & 2 deletions manticore/core/smtlib/solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,14 @@ def _send(self, cmd: str):
"""
# logger.debug('>%s', cmd)
try:
self._proc.stdout.flush()
self._proc.stdin.write(f"{cmd}\n")
if self._proc.stdout:
self._proc.stdout.flush()
else:
raise SolverError("Could not flush stdout: file descriptor is None")
if self._proc.stdin:
self._proc.stdin.write(f"{cmd}\n")
else:
raise SolverError("Could not write to stdin: file descriptor is None")
except IOError as e:
raise SolverError(str(e))

Expand Down
2 changes: 1 addition & 1 deletion manticore/core/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class ForkState(Concretize):
in forked states.
"""

def __init__(self, message, expression, **kwargs):
def __init__(self, message, expression: Bool, **kwargs):
assert isinstance(expression, Bool), "Need a Bool to fork a state in two states"
super().__init__(message, expression, policy="ALL", **kwargs)

Expand Down
2 changes: 1 addition & 1 deletion manticore/core/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ def load_state(self, state_id, delete=True):
"""
return self._store.load_state(f"{self._prefix}{state_id:08x}{self._suffix}", delete=delete)

def save_state(self, state, state_id=None):
def save_state(self, state: StateBase, state_id=None):
"""
Save a state to storage, return identifier.
Expand Down
57 changes: 30 additions & 27 deletions manticore/native/cpu/abstractcpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from capstone.x86 import X86_REG_ENDING
from capstone.arm import ARM_REG_ENDING

from typing import Any, Dict, Optional

logger = logging.getLogger(__name__)
register_logger = logging.getLogger(f"{__name__}.registers")

Expand Down Expand Up @@ -124,7 +126,7 @@ def __init__(self, parent):
scale = property(lambda self: self.parent.op.mem.scale)
disp = property(lambda self: self.parent.op.mem.disp)

def __init__(self, cpu, op):
def __init__(self, cpu: "Cpu", op):
"""
This encapsulates the arch-independent way to access instruction
operands and immediates based on the disassembler operand descriptor in
Expand All @@ -142,12 +144,12 @@ def __init__(self, cpu, op):
self.op = op
self.mem = Operand.MemSpec(self)

def _reg_name(self, reg_id):
def _reg_name(self, reg_id: int):
"""
Translates a register ID from the disassembler object into the
register name based on manticore's alias in the register file
:param int reg_id: Register ID
:param reg_id: Register ID
"""
# XXX: Support other architectures.
if (
Expand Down Expand Up @@ -495,18 +497,18 @@ class Cpu(Eventful):
"execute_syscall",
}

def __init__(self, regfile, memory, **kwargs):
def __init__(self, regfile: RegisterFile, memory, **kwargs):
assert isinstance(regfile, RegisterFile)
self._disasm = kwargs.pop("disasm", "capstone")
super().__init__(**kwargs)
self._regfile = regfile
self._memory = memory
self._instruction_cache = {}
self._instruction_cache: Dict[int, Any] = {}
self._icount = 0
self._last_pc = None
self._concrete = kwargs.pop("concrete", False)
self.emu = None
self._break_unicorn_at = None
self._break_unicorn_at: Optional[int] = None
self._delayed_event = False
if not hasattr(self, "disasm"):
self.disasm = init_disassembler(self._disasm, self.arch, self.mode)
Expand Down Expand Up @@ -730,7 +732,7 @@ def write_bytes(self, where: int, data, force: bool = False) -> None:
"""
Write a concrete or symbolic (or mixed) buffer to memory
:param int where: address to write to
:param where: address to write to
:param data: data to write
:type data: str or list
:param force: whether to ignore memory permissions
Expand Down Expand Up @@ -761,12 +763,12 @@ def write_bytes(self, where: int, data, force: bool = False) -> None:
for i in range(len(data)):
self.write_int(where + i, Operators.ORD(data[i]), 8, force)

def read_bytes(self, where, size, force=False):
def read_bytes(self, where: int, size: int, force: bool = False):
"""
Read from memory.
:param int where: address to read data from
:param int size: number of bytes
:param where: address to read data from
:param size: number of bytes
:param force: whether to ignore memory permissions
:return: data
:rtype: list[int or Expression]
Expand All @@ -776,13 +778,15 @@ def read_bytes(self, where, size, force=False):
result.append(Operators.CHR(self.read_int(where + i, 8, force)))
return result

def write_string(self, where, string, max_length=None, force=False):
def write_string(
self, where: int, string: str, max_length: Optional[int] = None, force: bool = False
) -> None:
"""
Writes a string to memory, appending a NULL-terminator at the end.
:param int where: Address to write the string to
:param str string: The string to write to memory
:param int max_length:
:param where: Address to write the string to
:param string: The string to write to memory
:param max_length:
The size in bytes to cap the string at, or None [default] for no
limit. This includes the NULL terminator.
Expand All @@ -795,17 +799,16 @@ def write_string(self, where, string, max_length=None, force=False):

self.write_bytes(where, string + "\x00", force)

def read_string(self, where, max_length=None, force=False):
def read_string(self, where: int, max_length: Optional[int] = None, force: bool = False) -> str:
"""
Read a NUL-terminated concrete buffer from memory. Stops reading at first symbolic byte.
:param int where: Address to read string from
:param int max_length:
:param where: Address to read string from
:param max_length:
The size in bytes to cap the string at, or None [default] for no
limit.
:param force: whether to ignore memory permissions
:return: string read
:rtype: str
"""
s = io.BytesIO()
while True:
Expand All @@ -822,43 +825,43 @@ def read_string(self, where, max_length=None, force=False):
where += 1
return s.getvalue().decode()

def push_bytes(self, data, force=False):
def push_bytes(self, data: str, force: bool = False):
"""
Write `data` to the stack and decrement the stack pointer accordingly.
:param str data: Data to write
:param data: Data to write
:param force: whether to ignore memory permissions
"""
self.STACK -= len(data)
self.write_bytes(self.STACK, data, force)
return self.STACK

def pop_bytes(self, nbytes, force=False):
def pop_bytes(self, nbytes: int, force: bool = False):
"""
Read `nbytes` from the stack, increment the stack pointer, and return
data.
:param int nbytes: How many bytes to read
:param nbytes: How many bytes to read
:param force: whether to ignore memory permissions
:return: Data read from the stack
"""
data = self.read_bytes(self.STACK, nbytes, force=force)
self.STACK += nbytes
return data

def push_int(self, value, force=False):
def push_int(self, value: int, force: bool = False):
"""
Decrement the stack pointer and write `value` to the stack.
:param int value: The value to write
:param value: The value to write
:param force: whether to ignore memory permissions
:return: New stack pointer
"""
self.STACK -= self.address_bit_size // 8
self.write_int(self.STACK, value, force=force)
return self.STACK

def pop_int(self, force=False):
def pop_int(self, force: bool = False):
"""
Read a value from the stack and increment the stack pointer.
Expand All @@ -879,11 +882,11 @@ def _wrap_operands(self, operands):
"""
raise NotImplementedError

def decode_instruction(self, pc):
def decode_instruction(self, pc: int):
"""
This will decode an instruction from memory pointed by `pc`
:param int pc: address of the instruction
:param pc: address of the instruction
"""
# No dynamic code!!! #TODO!
# Check if instruction was already decoded
Expand Down
Loading

0 comments on commit a150f02

Please sign in to comment.