Join GitHub today
GitHub is home to over 50 million developers working together to host and review code, manage projects, and build software together.
Sign up| import logging | |
| from contextlib import contextmanager | |
| from .state_merging import merge_constraints, is_merge_possible, merge | |
| from ..utils.helpers import issymbolic | |
| logger = logging.getLogger(__name__) | |
| class Plugin: | |
| def __init__(self): | |
| self.manticore = None | |
| @contextmanager | |
| def locked_context(self, key=None, value_type=list): | |
| """ | |
| A context manager that provides safe parallel access to the global Manticore context. | |
| This should be used to access the global Manticore context | |
| when parallel analysis is activated. Code within the `with` block is executed | |
| atomically, so access of shared variables should occur within. | |
| """ | |
| plugin_context_name = str(type(self)) | |
| with self.manticore.locked_context(plugin_context_name, dict) as context: | |
| assert value_type in (list, dict, set) | |
| ctx = context.get(key, value_type()) | |
| yield ctx | |
| context[key] = ctx | |
| @property | |
| def context(self): | |
| ''' Convenient access to shared context ''' | |
| plugin_context_name = str(type(self)) | |
| if plugin_context_name not in self.manticore.context: | |
| self.manticore.context[plugin_context_name] = {} | |
| return self.manticore.context[plugin_context_name] | |
| def on_register(self): | |
| ''' Called by parent manticore on registration ''' | |
| pass | |
| def on_unregister(self): | |
| ''' Called be parent manticore on un-registration ''' | |
| pass | |
| def _dict_diff(d1, d2): | |
| ''' | |
| Produce a dict that includes all the keys in d2 that represent different values in d1, as well as values that | |
| aren't in d1. | |
| :param dict d1: First dict | |
| :param dict d2: Dict to compare with | |
| :rtype: dict | |
| ''' | |
| d = {} | |
| for key in set(d1).intersection(set(d2)): | |
| if d2[key] != d1[key]: | |
| d[key] = d2[key] | |
| for key in set(d2).difference(set(d1)): | |
| d[key] = d2[key] | |
| return d | |
| class Tracer(Plugin): | |
| def did_execute_instruction_callback(self, state, pc, target_pc, instruction): | |
| state.context.setdefault('trace', []).append(pc) | |
| class ExtendedTracer(Plugin): | |
| def __init__(self): | |
| ''' | |
| Record a detailed execution trace | |
| ''' | |
| super().__init__() | |
| self.last_dict = {} | |
| self.current_pc = None | |
| self.context_key = 'e_trace' | |
| def register_state_to_dict(self, cpu): | |
| d = {} | |
| for reg in cpu.canonical_registers: | |
| val = cpu.read_register(reg) | |
| d[reg] = val if not issymbolic(val) else '<sym>' | |
| return d | |
| def will_execute_instruction_callback(self, state, pc, instruction): | |
| self.current_pc = pc | |
| def did_execute_instruction_callback(self, state, pc, target_pc, instruction): | |
| reg_state = self.register_state_to_dict(state.cpu) | |
| entry = { | |
| 'type': 'regs', | |
| 'values': _dict_diff(self.last_dict, reg_state) | |
| } | |
| self.last_dict = reg_state | |
| state.context.setdefault(self.context_key, []).append(entry) | |
| def will_read_memory_callback(self, state, where, size): | |
| if self.current_pc == where: | |
| return | |
| # print(f'will_read_memory {where:x} {size!r}, current_pc {self.current_pc:x}') | |
| def did_read_memory_callback(self, state, where, value, size): | |
| if self.current_pc == where: | |
| return | |
| # print(f'did_read_memory {where:x} {value!r} {size!r}, current_pc {self.current_pc:x}') | |
| def will_write_memory_callback(self, state, where, value, size): | |
| if self.current_pc == where: | |
| return | |
| # print(f'will_write_memory {where:x} {value!r} {size!r}, current_pc {self.current_pc:x}') | |
| def did_write_memory_callback(self, state, where, value, size): | |
| if self.current_pc == where: | |
| raise Exception | |
| return | |
| entry = { | |
| 'type': 'mem_write', | |
| 'where': where, | |
| 'value': value, | |
| 'size': size | |
| } | |
| state.context.setdefault(self.context_key, []).append(entry) | |
| class Follower(Plugin): | |
| def __init__(self, trace): | |
| self.index = 0 | |
| self.trace = trace | |
| self.last_instruction = None | |
| self.symbolic_ranges = [] | |
| self.active = True | |
| super().__init__() | |
| def add_symbolic_range(self, pc_start, pc_end): | |
| self.symbolic_ranges.append((pc_start, pc_end)) | |
| def get_next(self, type): | |
| event = self.trace[self.index] | |
| assert event['type'] == type | |
| self.index += 1 | |
| return event | |
| def did_write_memory_callback(self, state, where, value, size): | |
| if not self.active: | |
| return | |
| write = self.get_next('mem_write') | |
| if not issymbolic(value): | |
| return | |
| assert write['where'] == where and write['size'] == size | |
| # state.constrain(value == write['value']) | |
| def did_execute_instruction_callback(self, state, last_pc, pc, insn): | |
| if not self.active: | |
| return | |
| event = self.get_next('regs') | |
| self.last_instruction = event['values'] | |
| if issymbolic(pc): | |
| state.constrain(state.cpu.RIP == self.last_instruction['RIP']) | |
| else: | |
| for start, stop in self.symbolic_ranges: | |
| if start <= pc <= stop: | |
| self.active = False | |
| class RecordSymbolicBranches(Plugin): | |
| def did_execute_instruction_callback(self, state, last_pc, target_pc, instruction): | |
| if state.context.get('forking_pc', False): | |
| branches = state.context.setdefault('branches', {}) | |
| branch = (last_pc, target_pc) | |
| if branch in branches: | |
| branches[branch] += 1 | |
| else: | |
| branches[branch] = 1 | |
| state.context['forking_pc'] = False | |
| if issymbolic(target_pc): | |
| state.context['forking_pc'] = True | |
| class InstructionCounter(Plugin): | |
| def will_terminate_state_callback(self, state, state_id, ex): | |
| if state is None: # FIXME Can it be None? | |
| return | |
| state_instructions_count = state.context.get('instructions_count', 0) | |
| with self.manticore.locked_context() as manticore_context: | |
| manticore_instructions_count = manticore_context.get('instructions_count', 0) | |
| manticore_context['instructions_count'] = manticore_instructions_count + state_instructions_count | |
| def did_execute_instruction_callback(self, state, prev_pc, target_pc, instruction): | |
| address = prev_pc | |
| if not issymbolic(address): | |
| count = state.context.get('instructions_count', 0) | |
| state.context['instructions_count'] = count + 1 | |
| def did_finish_run_callback(self): | |
| _shared_context = self.manticore.context | |
| instructions_count = _shared_context.get('instructions_count', 0) | |
| logger.info('Instructions executed: %d', instructions_count) | |
| class Visited(Plugin): | |
| def __init__(self, coverage_file='visited.txt'): | |
| super().__init__() | |
| self.coverage_file = coverage_file | |
| def will_terminate_state_callback(self, state, state_id, ex): | |
| if state is None: | |
| return | |
| state_visited = state.context.get('visited_since_last_fork', set()) | |
| with self.manticore.locked_context() as manticore_context: | |
| manticore_visited = manticore_context.get('visited', set()) | |
| manticore_context['visited'] = manticore_visited.union(state_visited) | |
| def will_fork_state_callback(self, state, expression, values, policy): | |
| state_visited = state.context.get('visited_since_last_fork', set()) | |
| with self.manticore.locked_context() as manticore_context: | |
| manticore_visited = manticore_context.get('visited', set()) | |
| manticore_context['visited'] = manticore_visited.union(state_visited) | |
| state.context['visited_since_last_fork'] = set() | |
| def did_execute_instruction_callback(self, state, prev_pc, target_pc, instruction): | |
| state.context.setdefault('visited_since_last_fork', set()).add(prev_pc) | |
| state.context.setdefault('visited', set()).add(prev_pc) | |
| def did_finish_run_callback(self): | |
| _shared_context = self.manticore.context | |
| executor_visited = _shared_context.get('visited', set()) | |
| # Fixme this is duplicated? | |
| if self.coverage_file is not None: | |
| with self.manticore._output.save_stream(self.coverage_file) as f: | |
| for m in executor_visited: | |
| f.write(f"0x{m:016x}\n") | |
| logger.info('Coverage: %d different instructions executed', len(executor_visited)) | |
| # TODO document all callbacks | |
| class ExamplePlugin(Plugin): | |
| def will_open_transaction_callback(self, state, tx): | |
| logger.info('will open a transaction %r %r', state, tx) | |
| def will_close_transaction_callback(self, state, tx): | |
| logger.info('will close a transaction %r %r', state, tx) | |
| def will_decode_instruction_callback(self, state, pc): | |
| logger.info('will_decode_instruction %r %r', state, pc) | |
| def will_execute_instruction_callback(self, state, pc, instruction): | |
| logger.info('will_execute_instruction %r %r %r', state, pc, instruction) | |
| def did_execute_instruction_callback(self, state, pc, target_pc, instruction): | |
| logger.info('did_execute_instruction %r %r %r %r', state, pc, target_pc, instruction) | |
| def will_start_run_callback(self, state): | |
| ''' Called once at the beginning of the run. | |
| state is the initial root state | |
| ''' | |
| logger.info('will_start_run') | |
| def did_finish_run_callback(self): | |
| logger.info('did_finish_run') | |
| def will_fork_state_callback(self, parent_state, expression, solutions, policy): | |
| logger.info('will_fork_state %r %r %r %r', parent_state, expression, solutions, policy) | |
| def did_fork_state_callback(self, child_state, expression, new_value, policy): | |
| logger.info('did_fork_state %r %r %r %r', child_state, expression, new_value, policy) | |
| def did_load_state_callback(self, state, state_id): | |
| logger.info('did_load_state %r %r', state, state_id) | |
| def did_enqueue_state_callback(self, state, state_id): | |
| logger.info('did_enqueue_state %r %r', state, state_id) | |
| def will_terminate_state_callback(self, state, state_id, exception): | |
| logger.info('will_terminate_state %r %r %r', state, state_id, exception) | |
| def will_generate_testcase_callback(self, state, testcase, message): | |
| logger.info('will_generate_testcase %r %r %r', state, testcase, message) | |
| def will_read_memory_callback(self, state, where, size): | |
| logger.info('will_read_memory %r %r %r', state, where, size) | |
| def did_read_memory_callback(self, state, where, value, size): | |
| logger.info('did_read_memory %r %r %r %r', state, where, value, size) | |
| def will_write_memory_callback(self, state, where, value, size): | |
| logger.info('will_write_memory %r %r %r', state, where, value, size) | |
| def did_write_memory_callback(self, state, where, value, size): | |
| logger.info('did_write_memory %r %r %r %r', state, where, value, size) | |
| def will_read_register_callback(self, state, register): | |
| logger.info('will_read_register %r %r', state, register) | |
| def did_read_register_callback(self, state, register, value): | |
| logger.info('did_read_register %r %r %r', state, register, value) | |
| def will_write_register_callback(self, state, register, value): | |
| logger.info('will_write_register %r %r %r', state, register, value) | |
| def did_write_register_callback(self, state, register, value): | |
| logger.info('did_write_register %r %r %r', state, register, value) | |
| class Merger(Plugin): | |
| def load_state(self, state_id, delete=False): | |
| ''' | |
| Loads a state in Manticore with state-id = `state_id` by using the corresponding API in Executor | |
| :param state_id: state-id for state that is being loaded | |
| :param delete: If set to True, deletes the state with state-id = `state_id` | |
| :return: None | |
| ''' | |
| return self.manticore._executor._load_state(state_id, delete) | |
| def delete_state(self, state_id): | |
| ''' | |
| Deletes a state in Manticore with state-id = `state_id` by using the corresponding API in Executor | |
| :param state_id: state-id for state that is being deleted | |
| :return: None | |
| ''' | |
| return self.manticore._executor._delete_state(state_id) | |
| def replace_state(self, state_id, state): | |
| ''' | |
| Replaces a state in Manticore with state-id = `state_id` by using the corresponding API in Executor | |
| :param state_id: state-id for state that is being replaced | |
| :param state: State object that is replacing the existing state with `state_id` | |
| :return: None | |
| ''' | |
| return self.manticore._executor._replace_state(state_id, state) | |
| def did_enqueue_state_callback(self, state_id, state): | |
| ''' | |
| Maintains a `cpu_stateid_dict` in context that keeps maps program counter values to a list of state-id values | |
| for states that will execute the instruction that that program counter as the next instruction | |
| :param state_id: id for state that was just enqueued | |
| :param state: State object that was just enqueued | |
| :return: None | |
| ''' | |
| # when a new state is addded to the list we save it so we do not have | |
| # to repload all states when try to merges last PC | |
| with self.locked_context('cpu_stateid_dict', dict) as cpu_stateid_dict: | |
| # as we may be running in a different process we need to access this | |
| # on a lock and over shared memory like this | |
| state_id_list = cpu_stateid_dict.get(state.cpu.PC, list()) | |
| state_id_list.append(state_id) | |
| cpu_stateid_dict[state.cpu.PC] = state_id_list | |
| def will_load_state_callback(self, current_state_id): | |
| ''' | |
| Checks if the state to be loaded (referenced by `current_state_id` can be merged with another currently enqueued | |
| state and replaced by the merged state | |
| :param current_state_id: state about to be loaded | |
| :return: None | |
| ''' | |
| # When a state is loaded for exploration lets check if we can find it a | |
| # mate for merging | |
| with self.locked_context('cpu_stateid_dict') as cpu_stateid_dict: | |
| # we get the lock and get a copy of the shared context | |
| merged_state = self.load_state(current_state_id) | |
| states_at_pc = cpu_stateid_dict.get(merged_state.cpu.PC, []) | |
| # lets remove ourself from the list of waiting states | |
| assert current_state_id in states_at_pc | |
| states_at_pc.remove(current_state_id) | |
| #Iterate over all remaining states that are waiting for exploration | |
| #at the same PC | |
| merged_ids = [] | |
| for new_state_id in states_at_pc: | |
| new_state = self.load_state(new_state_id) | |
| (exp_merged_state, exp_new_state, merged_constraint) = merge_constraints(merged_state.constraints, new_state.constraints) | |
| is_mergeable, reason = is_merge_possible(merged_state, new_state, merged_constraint) | |
| if is_mergeable: | |
| #Ok we'll merge it! | |
| merged_state = merge(merged_state, new_state, exp_merged_state, merged_constraint) | |
| #lets remove the vestigial links to the old state | |
| self.delete_state(new_state_id) | |
| merged_ids.append(new_state_id) | |
| is_mergeable = "succeeded" | |
| else: | |
| is_mergeable = "failed because of " + reason | |
| debug_string = "at PC = " + hex(merged_state.cpu.PC) + \ | |
| ", merge " + is_mergeable + " for state id = " + \ | |
| str(current_state_id) + " and " + str(new_state_id) | |
| print(debug_string) | |
| for i in merged_ids: | |
| states_at_pc.remove(i) | |
| cpu_stateid_dict[merged_state.cpu.PC] = states_at_pc | |
| #Ok so we have merged current_state_id with {merged_ids} | |
| #And removed all merged_ids from everywhere | |
| #UGLY we are replacing a state_id. This may be breaking caches in | |
| #the future | |
| self.replace_state(current_state_id, merged_state) |