# Mining Input Grammars

So far, the grammars we have seen have been mostly specified manually – that is, you (or the person knowing the input format) had to design and write a grammar in the first place.  While the grammars we have seen so far have been rather simple, creating a grammar for complex inoputs can involve quite some effort.  In this chapter, we therefore introduce techniques that automatically _mine_ grammars from programs – by executing the programs and observing how they process which parts of the input.  In conjunction with a grammar fuzzer, this allows us to (1) take a program, (2) extract its input grammar, and (3) fuzz it with high efficiency and effectiveness.

**Prerequisites**

* You should have read the [chapter on grammars](Grammars.ipynb).
* The [chapter on configuration fuzzing](ConfigurationFuzzer.ipynb) introduces grammar mining for configuration options, as well as observing variables and values during execution.
* The concept of parsing from [chapter on parsers](Parser.ipynb) is also useful.

Consider the `parse_vehicle()` and `process_inventory()`  methods from the [chapter on parsers](Parser.ipynb) (modified to remove printing)

In [None]:
def process_van(year, company, model):
    desc = "We have a %s %s van from %s vintage." % (company, model, year)
    iyear = int(year)
    if iyear > 2010:
        return "%s\nIt is a recent model!" % desc
    else:
        return "%s\nIt is an old but reliable model!" % desc


def process_car(year, company, model):
    desc = "We have a %s %s car from %s vintage." % (company, model, year)
    iyear = int(year)
    if iyear > 2016:
        return "%s\nIt is a recent model!" % desc
    else:
        return "%s\nIt is an old but reliable model!" % desc


def process_vehicle(vehicle):
    year, kind, company, model, *_ = vehicle.split(',')
    if kind == 'van':
        return process_van(year, company, model)
    elif kind == 'car':
        return process_car(year, company, model)
    else:
        raise Exception('Invalid entry')

The whole process is driven by a method `process_inventory()` defined below.

In [None]:
def process_inventory(inventory):
    result = []
    for vehicle in inventory.split('\n'):
        r = process_vehicle(vehicle)
        result.append(r)
    return "\n".join(result)

The method `process_inventory()` takes inputs of the following form.

In [None]:
INVENTORY = """\
1997,van,Ford,E350
2000,car,Mercury,Cougar
1999,car,Chevy,Venture\
"""
print(process_inventory(INVENTORY))

We found from the [chapter on parsers](Parser.ipynb) that coarse grammars do not work well for fuzzing when the input format includes details expressed only in code. That is, even though we have the formal specification of CSV files ([RFC 4180](https://tools.ietf.org/html/rfc4180)), the inventory system includes further rules as to what is expected at each index of the CSV file. The solution of simply recombining existing inputs, while practical, is incomplete. In particular, it relies on a formal input specification being available in the first place. However, we have no assurance that the program obeys the input specification given.

One of the ways out of this predicament is to interrogate the program under test as to what its input specification is. That is, if the program under test is written in a recursive descent style, with specific methods responsible for handling specific parts of the input, one can recover the parse tree, by observing the process of parsing. Further, one can recover a reasonable approximation of the grammar by abstraction from multiple input trees.

The idea is as follows
* The assumption (1) is that the program is written in such a fashion that specific methods are responsible for parsing specific fragments of the program. This includes almost all ad hoc parsers.
* We hook into the Python execution and observe how the input fragments are produced and named in different methods.
* Stitch the input fragments together in a tree structure to produce a parse tree.
* Abstract common elements from multiple parse trees to produce the Context Free Grammar of the input.

## A Simple Grammar Miner

Say we want to obtain the input grammar for the function `process_vehicle()`. We first collect the sample inputs for this function.

In [None]:
VEHICLES = INVENTORY.split('\n')

### The input context

We have seen from the chapter on [configuration fuzzing](ConfigurationFuzzer.ipynb) that one can hook into the Python runtime to observe the arguments to a function and any local variables created. We have also seen that one can obtain the context of execution by inspecting the `frame` argument. Here is a simple tracer that can return the local variables and other contextual information in a traced function.

In [None]:
def traceit(frame, event, arg):
    method_name = frame.f_code.co_name
    if method_name != "process_vehicle":
        return
    file_name = frame.f_code.co_filename
    param_names = [frame.f_code.co_varnames[i] for i in range(frame.f_code.co_argcount)]
    print(event, file_name, method_name, param_names, frame.f_locals)
    return traceit

In [None]:
import sys

In [None]:
oldtrace = sys.gettrace()
sys.settrace(traceit)
process_vehicle(VEHICLES[0])
sys.settrace(oldtrace)

### Tracer

 We expand the `traceit()` tracer to a full fledged class `Tracer` that acts as a *context manager*.

In [None]:
class Tracer:
    def __enter__(self):
        self.oldtrace = sys.gettrace()
        sys.settrace(self.trace_event)
        return self

    def __exit__(self, *args):
        sys.settrace(self.oldtrace)

    def trace_event(self, frame, event, arg):
        method_name = frame.f_code.co_name
        if method_name != "process_vehicle":
            return
        file_name = frame.f_code.co_filename
        param_names = [
            frame.f_code.co_varnames[i]
            for i in range(frame.f_code.co_argcount)
        ]
        print(event, file_name, method_name, param_names, frame.f_locals)
        return self.trace_event

 That is, any function executed under it gets a tracing hook installed.

In [None]:
with Tracer() as tracer:
    process_vehicle(VEHICLES[0])

The trace produced by executing any function can get overwhelmingly large. Hence, we need restrict our attention to specific modules. Further, we also restrict our attention exclusively to `str` variables since these variables are more likely to contain input fragments. (We will show how to deal with complex objects later.)

#### Context

First we define a `Context` class that provides easy access to the information such as the current module, and parameter names.

In [None]:
class Context:
    def __init__(self, frame, track_caller=True):
        self.method = self._method(frame)
        self.parameter_names = self._get_parameters(frame)
        self.file_name = self._file_name(frame)
        self.parent = Context(frame.f_back,
                              False) if track_caller and frame.f_back else None

    def _get_parameters(self, frame):
        return [
            frame.f_code.co_varnames[i]
            for i in range(frame.f_code.co_argcount)
        ]

    def _file_name(self, frame):
        return frame.f_code.co_filename
    
    def _method(self, frame):
        return frame.f_code.co_name

    def all_vars(self, frame):
        return frame.f_locals
    
    def __repr__(self):
        return "%s:%s(%s)" % (self.file_name, self.method, ','.join(self.parameter_names))

We then use the context to decide which modules to monitor, and which variables to trace. Additionally, we store the current *input string* so that it can be used to determine if any particular string fragments came from the current input string.

In [None]:
class Tracer(Tracer):
    def __init__(self, inputstr, files=[]):
        self.inputstr, self.files, self.trace = inputstr, files, []

    def tracing_var(self, k, v):
        return isinstance(v, str)

    def tracing_context(self, cxt, event, arg):
        if not self.files:
            return True
        return any(cxt.file_name.endswith(f) for f in self.files)

    def on_event(self, event, arg, cxt, my_vars):
        self.trace.append((event, arg, cxt, my_vars))

    def trace_event(self, frame, event, arg):
        cxt = Context(frame)
        if not self.tracing_context(cxt, event, arg):
            return self.trace_event

        my_vars = [(k, v) for k, v in cxt.all_vars(frame).items()
                   if self.tracing_var(k, v)]
        self.on_event(event, arg, cxt, my_vars)
        return self.trace_event

    def __call__(self):
        return self.inputstr

The `Tracer` class can now focus on specific kinds of events on specific files. Further, it provides a first level filter for variables that we find interesting. For example, we want to focus specifically on `string` variables that contain input fragments.

In [None]:
with Tracer(VEHICLES[0]) as tracer:
    process_vehicle(tracer())

The execution produced the following trace.

In [None]:
tracer.trace

The `settrace()` function hooks into the Python debugging facility. When it is in operation, no debugger can hook into the program. Hence, we limit the tracer to the simplest implementation possible, and implement the core of grammar mining in later stages.

### Tracker

Our tracer simply records the variable values as they occur. We next need to check if the variables contain values from the **input string**. Common ways to do this is to rely on symbolic execution or at least dynamic tainting, which are powerful, but also complex. However, one can obtain a reasonable approximation by simply relying on substring search. That is, we consider any value produced that is a substring of the original input string to have come from the original input.

One of the problems of using substring search is that short string sequences tend to be included in other string sequences even though they may not have come from the original string. That is, say the input fragment is `v`. It could have equally come from either `van` or `chevy`. We rely on being able to predict the exact place input where a given fragment occurred. Hence, we define a constant `LOOKAHEAD_LEN` such that we ignore strings up to that length.

In [None]:
LOOKAHEAD_LEN=2

The tracker identifies string fragments that are part of the input string, and stores them in a dictionary.

In [None]:
class Tracker:
    def __init__(self, inputstr, trace, **kwargs):
        self.the_vars = {}
        self.trace = trace
        self.inputstr = inputstr
        self.options(kwargs)
        self.process()

    def options(self, kwargs):
        self.log = kwargs.get('log') or False

    def include(self, var, value):
        return len(value) > LOOKAHEAD_LEN and value in self.inputstr

    def track_event(self, event, arg, cxt, my_vars):
        self.the_vars.update({k: v for k, v in my_vars if self.include(k, v)})

    def logger(self, var):
        if self.log:
            print(var)

    def process(self):
        for event, arg, cxt, my_vars in self.trace:
            self.track_event(event, arg, cxt, my_vars)

Using the tracker, we can obtain the input fragments as follows

In [None]:
tracker = Tracker(tracer.inputstr, tracer.trace)
for k,v in tracker.the_vars.items():
    print(k, '=', repr(v))

### Extracting a Derivation Tree

The input fragments from the `Tracker` can be used to obtain a derivation tree of the input.

In [None]:
import fuzzingbook_utils

In [None]:
from Grammars import START_SYMBOL, syntax_diagram

The derivation tree `Miner` is initialized with the input string, and the variable assignments, and it converts the assignments to the corresponding derivation tree.

In [None]:
class Miner:
    def __init__(self, my_input, my_assignments, **kwargs):
        self.my_input = my_input
        self.my_assignments = my_assignments
        self.options(kwargs)
        self.tree = self.get_derivation_tree()

    def options(self, kwargs):
        self.log = kwargs.get('log') or False

    def logger(self, indent, var):
        if self.log:
            print('\t' * indent, var)

The basic idea is as follows:
* We represent the derivation tree as a [straight line grammar](https://en.wikipedia.org/wiki/Straight-line_grammar) with each node represented by a key value pair. The key corresponds to the variable name, and the value corresponds to the representation of the value of the variable. The value representation may contain references to other nodes.
* We start with a derivation tree with a single node -- the start symbol and the input string as its leaf.
* For each pair _VAR_, _VALUE_ found in `my_assignments`:

1. We search for occurrences of _VALUE_ in the grammar
2. We replace them by <_VAR_>
3. We add a new rule <_VAR_> $\rightarrow$ <_VALUE_> to the grammar

In [None]:
class Miner(Miner):
    def nonterminal(self, var):
        return "<" + var.lower() + ">"

    def get_derivation_tree(self):
        tree = {START_SYMBOL: (self.my_input, )}
        my_assignments = self.my_assignments.copy()

        while True:
            new_rules = []
            for var, value in my_assignments.items():
                self.logger(0, "%s = %s" % (var, value))
                for key, repl in tree.items():
                    self.logger(1, "%s : %s" % (key, repl))
                    if not any(value in t for t in repl):
                        continue
                    alt_key = self.nonterminal(var)
                    new_arr = []
                    for k, token in enumerate(repl):
                        if not value in token:
                            new_arr.append(token)
                        else:
                            arr = token.split(value)
                            new_arr.extend(
                                list(sum(zip(arr,
                                             len(arr) * [alt_key]), ()))[:-1])
                    tree[key] = tuple(i for i in new_arr if i)
                    new_rules.append((var, alt_key, value))

            if not new_rules:
                break  # Nothing to expand anymore

            for (var, alt_key, value) in new_rules:
                tree[alt_key] = (value, )
                self.logger(0, "+%s = %s" % (alt_key, value))

                # Do not expand this again
                del my_assignments[var]

        return {key: values for key, values in tree.items()}

The `Miner` is used as follows:

In [None]:
with Tracer(VEHICLES[0]) as tracer:
    process_inventory(tracer())
assignments = Tracker(tracer.inputstr, tracer.trace).the_vars
dt = Miner(tracer.inputstr, assignments, log=True)

Converting the grammar representation to a parse tree is accomplished by the `to_tree()` method

In [None]:
class Miner(Miner):
    def to_tree(self, key=START_SYMBOL):
        if key not in self.tree:
            return (key, [])
        children = [self.to_tree(c) for c in self.tree[key]]
        return (key, children)

Combining all the pieces:

In [None]:
trees = []
for VEHICLE in VEHICLES:
    print(VEHICLE)
    with Tracer(VEHICLE) as tracer:
        process_inventory(tracer())
    assignments = Tracker(tracer.inputstr, tracer.trace).the_vars
    trees.append((tracer.inputstr, assignments))
    for var, val in assignments.items():
        print(var + " = " + repr(val))
    print()

In [None]:
from GrammarFuzzer import GrammarFuzzer, FasterGrammarFuzzer, display_tree, tree_to_string

In [None]:
csv_dt = []
for inputstr, assignments in trees:
    print(inputstr)
    dt = Miner(inputstr, assignments)
    csv_dt.append(dt)
    display_tree(dt.to_tree())

### Recovering Grammar from Derivation Trees

We define a class `Infer` that can combine multiple derivation trees to produce the grammar. The initial grammar is empty.

In [None]:
class Infer:
    def __init__(self):
        self.grammar = {}

The `add_tree()` method gets a combined list of non-terminals from current grammar, and the tree to be added to the grammar, and updates the definitions of each non-terminal.

In [None]:
class Infer(Infer):
    def add_tree(self, t):
        merged_grammar = {}
        for key in list(self.grammar.keys()) + list(t.tree.keys()):
            alternates = set(self.grammar.get(key, []))
            if key in t.tree:
                alternates.add(''.join(t.tree[key]))
            merged_grammar[key] = list(alternates)
        self.grammar = merged_grammar

The `Infer` is used as follows:

In [None]:
i = Infer()
for dt in csv_dt:
    i.add_tree(dt)

In [None]:
syntax_diagram(i.grammar)

Given execution traces from various inputs, one can define `recover_grammar()` to obtain the complete grammar from the traces.

In [None]:
def recover_grammar(traces):
    m = Infer()
    for inputstr, trace in traces:
        dt = Miner(inputstr, Tracker(inputstr, trace).the_vars)
        m.add_tree(dt)
    return m.grammar

In [None]:
traces = []
for inputstr in VEHICLES:
    with Tracer(inputstr) as tracer:
        process_vehicle(tracer())
    traces.append((tracer.inputstr, tracer.trace))
grammar = recover_grammar(traces)

### Fuzzing

In [None]:
f = GrammarFuzzer(grammar)
for i in range(10):
    print(f.fuzz())

#### Example 2. Recovering URL Grammar

In [None]:
URLS = [
    'http://user:pass@www.google.com:80/?q=path#ref',
    'https://www.cispa.saarland:80/',
    'http://www.fuzzingbook.org/#News',
]

In [None]:
from urllib.parse import urlparse, clear_cache

In [None]:
traces = []
for inputstr in URLS:
    clear_cache()
    with Tracer(inputstr, ['urllib/parse.py']) as tracer:
        urlparse(tracer())
    traces.append((tracer.inputstr, tracer.trace))
grammar = recover_grammar(traces)

In [None]:
syntax_diagram(grammar)

### Repeating elements

In [None]:
URLS_X = URLS + ['ftp://freebsd.org/releases/5.8']

In [None]:
traces = []
for inputstr in URLS_X:
    clear_cache()
    with Tracer(inputstr, ['urllib/parse.py']) as tracer:
        urlparse(tracer())
    traces.append((tracer.inputstr, tracer.trace))
grammar = recover_grammar(traces)

In [None]:
syntax_diagram(grammar)

## Grammar Miner with Stack

### Input Stack

In [None]:
class InputStack(object):
    def __init__(self, i):
        self.original = i
        self.inputs = []
        
    def height(self):
        return len(self.inputs)

    def has(self, val):
        return any(val in var for var in self.inputs[-1].values())

    def ignored(self, val):
        return not (isinstance(val, str) and len(val) > LOOKAHEAD_LEN)

    def include(self, k, val):
        if self.ignored(val):
            return False
        return self.has(val) if self.inputs else val in self.original

    def push(self, inputs):
        my_inputs = {k: v for k, v in inputs.items() if self.include(k, v)}
        self.inputs.append(my_inputs)

    def pop(self):
        return self.inputs.pop()

### Vars

We proxy the dictionary so that it will only update if it does not already contain a value.

In [None]:
class Vars(object):
    def __init__(self, stack):
        self.defs = {START_SYMBOL: stack.original}
        self.istack = stack
        
    def set_kv(self, k, v):
        if k not in self.defs:
            self.defs[k] = v

    def update(self, v):
        for k,v in v.items():
            self.set_kv(k,v)

### Stack Tracker

In [None]:
class StackTracker(Tracker):
    def __init__(self, inputstr, trace, **kwargs):
        self.istack = InputStack(inputstr)
        self.the_vars = Vars(self.istack)
        self.trace = trace
        self.options(kwargs)
        self.process()

    def options(self, kwargs):
        self.track_params = kwargs.get('track_params') or True
        self.track_vars = kwargs.get('track_vars') or True
        self.track_return = kwargs.get('track_return') or False

    def include(self, var, value):
        return self.istack.include(var, value)

    def get_params(self, cxt, all_vars):
        return {
            "%s:%s" % (cxt.method, k): v
            for k, v in all_vars if k in cxt.parameter_names
        }

    def on_call(self, arg, cxt, my_vars):
        my_parameters = {
            k: v
            for k, v in self.get_params(cxt, my_vars).items()
            if not self.istack.ignored(v)
        }
        self.istack.push(my_parameters)
        if self.track_params:
            self.the_vars.update(my_parameters)

    def on_line(self, arg, cxt, my_vars):
        if self.track_vars:
            qvars = {"%s:%s" % (cxt.method, k): v for k, v in my_vars}
            my_vars = {
                var: value
                for var, value in qvars.items() if self.include(var, value)
            }
            if not self.track_params:
                my_vars = {
                    var: value
                    for var, value in my_vas.items() if var not in param_names
                }
            self.the_vars.update(my_vars)

    def on_return(self, arg, cxt, my_vars):
        self.istack.pop()
        self.on_line(arg, cxt, my_vars)
        if self.track_return:
            var = '(<-%s)' % cxt.method
            if self.include(var, arg):
                self.the_vars.update({var: arg})

    def track_event(self, event, arg, cxt, my_vars):
        if event == 'call':
            return self.on_call(arg, cxt, my_vars)

        if event == 'return':
            return self.on_return(arg, cxt, my_vars)

        if event == 'exception':
            return

        self.on_line(arg, cxt, my_vars)

We need to modify `traceit()` to be aware of events now:

In [None]:
url_traces = []
for inputstr in URLS:
    clear_cache()
    with Tracer(inputstr, ['urllib/parse.py']) as tracer:
        urlparse(tracer())
    sm = StackTracker(tracer.inputstr, tracer.trace)
    url_traces.append((tracer.inputstr, sm))
    for k,v in sm.the_vars.defs.items():
        print(k, v)
    print()

Note that in the following we do not account for parameters getting reassigned values.

### Miner

For each (VAR, VALUE) found:
* We search for occurrences of VALUE in the grammar
* We replace them by VAR
* We add a new rule VAR -> VALUE to the grammar

In [None]:
class Miner(Miner):
    def get_derivation_tree(self):
        my_assignments = self.my_assignments.copy()
        tree = {}
        for var, value in my_assignments.items():
            nt_var = var if var == START_SYMBOL else self.nonterminal(var)
            self.logger(0, "%s = %s" % (nt_var, value))
            if tree:
                append = False
                for key, repl in tree.items():
                    self.logger(1, "%s : %s" % (key, repl))
                    if not any(value in t for t in repl):
                        continue
                    new_arr = []
                    for k, token in enumerate(repl):
                        if not value in token:
                            new_arr.append(token)
                        else:
                            append = True
                            arr = token.split(value)
                            new_arr.extend(
                                list(sum(zip(arr,
                                             len(arr) * [nt_var]), ()))[:-1])
                    tree[key] = tuple(i for i in new_arr if i)
                if append:
                    self.logger(0, "+%s = %s" % (nt_var, value))
                    tree[nt_var] = set([value])
            else:
                tree[nt_var] = (value, )
        return  {key: values for key, values in tree.items()}

In [None]:
clear_cache()
with Tracer(URLS[2], ['urllib/parse.py']) as tracer:
    urlparse(tracer())
sm = StackTracker(tracer.inputstr, tracer.trace)
dt = Miner(tracer.inputstr, sm.the_vars.defs)
display_tree(dt.to_tree())

In [None]:
def recover_grammar(traces):
    m = Infer()
    for inputstr, trace in traces:
        st = StackTracker(inputstr, trace)
        dt = Miner(inputstr, st.the_vars.defs)
        m.add_tree(dt)
    return m.grammar

In [None]:
traces = []
for inputstr in URLS_X:
    clear_cache()
    with Tracer(inputstr, ['urllib/parse.py']) as tracer:
        urlparse(tracer())
    traces.append((tracer.inputstr, tracer.trace))
grammar = recover_grammar(traces)
syntax_diagram(grammar)

In [None]:
with Tracer(INVENTORY) as tracer:
    process_inventory(tracer())
sm = StackTracker(tracer.inputstr, tracer.trace)
for k,v in sm.the_vars.defs.items():
    print(k,' = ',v)
dt = Miner(tracer.inputstr, sm.the_vars.defs)
display_tree(dt.to_tree())

## Tainted Miner

In [None]:
from InformationFlow import tstr, ctstr, rewrite_in, Instr

In [None]:
CURRENT_METHOD = None

In [None]:
def get_current_method():
    return tuple(CURRENT_METHOD)


def set_current_method(method, method_stack):
    global CURRENT_METHOD
    CURRENT_METHOD = (method, len(method_stack), method_stack[-1])
    return tuple(CURRENT_METHOD)

In [None]:
class xtstr(ctstr):
    def add_instr(self, op, c_a, c_b):
        ct = None
        if len(c_a) == 1 and isinstance(c_a, xtstr):
            ct = c_a.taint[0]
        elif len(c_b) == 1 and isinstance(c_b, xtstr):
            ct = c_b.taint[0]
        self.comparisons.append((ct, Instr(op, c_a, c_b), get_current_method()))

    def create(self, res, taint):
        o = xtstr(res, taint, self)
        o.comparisons = self.comparisons
        return o

First, we expand any object to a list of variables.

In [None]:
def flatten(key, val):
    # Should we limit flatened objects to repr ~ tstr here or during call?
    tv = type(val)
    if isinstance(val, (int, float, complex, str, bytes, bytearray)):
        return [(key, val)]
    elif isinstance(val, (set, frozenset, list, tuple, range)):
        values = [e for i, elt in enumerate(val) for e in flatten(i, elt)]
        return [("%s.%d" % (key, i), v) for i, v in values]
    elif isinstance(val, dict):
        values = [e for k, elt in val.items() for e in flatten(k, elt)]
        return [("%s.%s" % (key, k), v) for k, v in values]
    elif isinstance(val, tstr):
        return [(key, val)]
    elif hasattr(val,'__dict__'):
        values = [e for k, elt in val.__dict__.items() for e in flatten(k, elt)]
        return [("%s.%s" % (key, k), v) for k, v in values]
    else:
        return [(key, repr(v))]

### Tainted Stack

For a simple miner, we do not need the input stack. All that we need is the ability to identify reassignments in variables. However, it makes life a little simpler if we can annotate variables with the stack depth.

In [None]:
class TaintedStack(InputStack):
    # height has ignored include push pop
    def __init__(self, i):  # same as original
        self.original = i
        self.inputs = []

    # has is used only with include
    def has(self, val):
        assert False

    def ignored(self, val):
        return not (isinstance(repr(val), tstr))

    # used from push (and Tracker)
    def include(self, k, val):
        if self.ignored(val):
            return False
        return val.taint_in(self.original)

One way to account for custom data structures other than containers is to rely on its `repr()`. That is, both `str()` and `repr()` relies on string methods that we have overridden in the tainted string. Hence if any of the string fragments are tainted, their return will also tainted.

### Tainted Vars

In [None]:
class TaintedVars(Vars):
    def __init__(self, stack):
        self.accessed_seq_var = {}
        self.taint_register = {}
        self.method_id = None
        super().__init__(stack)

In [None]:
class TaintedVars(TaintedVars):
    def update(self, values):
        vals = [(k1, v1) for k, v in values.items() for k1, v1 in flatten(k, v)]
        for k, v in vals:
            self.set_kv(k, v)

In [None]:
CURRENT_METHOD = None

In [None]:
class TaintedVars(TaintedVars):
    def set_current_method(self, method):
        self.method_id = method

    def var_init(self, var):
        if var not in self.accessed_seq_var:
            self.accessed_seq_var[var] = 0

    def var_assign(self, var):
        self.accessed_seq_var[var] += 1

    def var_name(self, var):
        var_seq = self.accessed_seq_var[var]
        # TODO: figure out how to deal with stack/vars stack height
        method, stack, method_seq = self.method_id
        return "%s[stack:%d mseq:%d vseq:%d]" % (var, stack, method_seq, var_seq)

    def set_kv(self, var, val):
        self.var_init(var)
        sa_var = self.var_name(var)
        if sa_var not in self.defs:
            self.defs[sa_var] = val
            self.taint_register[str(
                val.taint)] = self.taint_register.get(str(val.taint)) or set()
            self.taint_register[str(val.taint)].add(sa_var)
        else:  # possible reassignment
            tainted_vars = self.taint_register.get(str(val.taint))
            if tainted_vars is None or sa_var not in tainted_vars:  # a change in taint
                self.var_assign(var)
                sa_var = self.var_name(var)
                self.defs[sa_var] = val
                self.taint_register[str(val.taint)] = self.taint_register.get(
                    str(val.taint)) or set()
                self.taint_register[str(val.taint)].add(sa_var)

### Tainted Tracer

In [None]:
class TaintedTracer(Tracer):
    def __init__(self, inputstr, restrict={}):
        self.inputstr = xtstr(inputstr, parent=None).with_comparisons([])
        self.trace = []
        self.restrict = restrict

        self.method_num = 0
        self.method_num_stack = [self.method_num]

    def tracing_var(self, k, v):
        return isinstance(repr(v), tstr)

    def tracing_context(self, cxt, event, arg):
        if self.restrict.get('files'):
            return any(
                cxt.file_name.endswith(f) for f in self.restrict['files'])
        if self.restrict.get('methods'):
            return cxt.method in self.restrict['methods']
        return True

    def on_event(self, event, arg, cxt, my_vars):
        super().on_event(event, arg, cxt, my_vars)
        if event == 'call':
            self.method_num += 1
            self.method_num_stack.append(self.method_num)
        elif event == 'return':
            self.method_num_stack.pop()
        set_current_method(cxt.method, self.method_num_stack)

### Tainted Tracker

In [None]:
class TaintedTracker(StackTracker):
    def __init__(self, inputstr, trace, **kwargs):
        self.istack = TaintedStack(inputstr)
        self.inputstr = inputstr
        self.the_vars = TaintedVars(self.istack)
        self.trace = trace
        self.mstack = ['']
        self.method_register = {}


        self.options(kwargs)
        self.process()

    def call_enter(self, method):
        key = (method, len(self.mstack))
        if key not in self.method_register:
            self.method_register[key] = 0
        else:
            self.method_register[key] += 1
        method_id = (*key, self.method_register[key])
        print("->", "\t" * len(self.mstack), method_id)
        self.mstack.append(method_id)
        self.the_vars.set_current_method(self.mstack[-1])

    def call_exit(self, method):
        m = self.mstack.pop()
        print("<-", "\t" * len(self.mstack), m)
        self.the_vars.set_current_method(self.mstack[-1])

    def on_call(self, arg, cxt, my_vars):
        self.call_enter(cxt.method)
        super().on_call(arg, cxt, my_vars)

    def on_return(self, arg, cxt, my_vars):
        super().on_return(arg, cxt, my_vars)
        self.call_exit(cxt.method)

In [None]:
def recover_grammar(traces):
    m = Infer()
    for inputstr, trace in traces:
        st = TaintedTracker(inputstr, trace)
        dt = Miner(inputstr, st.the_vars.defs)
        m.add_tree(dt)
    return m.grammar

In [None]:
restrict = {'methods':['process_inventory', 'process_vehicle', 'process_car', 'process_van']}
with TaintedTracer(INVENTORY, restrict) as tracer:
    process_inventory(tracer())
sm = TaintedTracker(tracer.inputstr, tracer.trace)
#grammar = recover_grammar(traces)
for k, v in sm.the_vars.defs.items():
    print("%s = <%s> \t %s" % (k,v, len(v.taint)))

### Tainted Miner

In [None]:
class TaintedMiner(Miner):
    def get_derivation_tree(self):
        my_assignments = self.my_assignments.copy()
        root = (START_SYMBOL[1:-1], my_assignments[START_SYMBOL], [])
        del my_assignments[START_SYMBOL]
        for k, v in my_assignments.items():
            self.insert_into_tree(root, (k, v, []))
        return root

In [None]:
class TaintedMiner(TaintedMiner):
    def insert_into_tree(self, root, elt):
        # for each children of root, see if we are a
        # subset of taints. If none, add ourself as a first level childA

        ek, ev, eitems = elt
        first_level = True
        # first, look through the children to figure out if any one is a super set.
        for child in root[2]:
            if ev.taint_in(child[1]):
                assert first_level
                first_level = False
                self.insert_into_tree(child, elt)

        if not first_level:
            return root

        # then look through the children to figure out *how many* of them are subsets.
        children = list(root[2])
        new_children = []
        while children:
            child, *children = children
            if child[1].taint_in(ev):
                first_level = False
                self.insert_into_tree(elt, child)
            else:
                new_children.append(child)

        if not first_level:
            new_children.append(elt)
            root[2][:] = new_children
            return

        # neither superset or subset. Time to just append.
        # check for overlap.
        children = root[2]
        for c in children:
            for t in c[1].taint:
                if t in ev.taint:
                    assert False
        root[2].append((ek, ev, eitems))

Ensure that all children are in the right positions. In particular, assert that there are no overlaps. (If there are overlaps, we might have to choose between one of the overlapped elements based on the height of the tree of that element. -- Remember that each element is inserted as a child to *all* matching elements, and not just the first matching one. Hence, we can afford to choose between the trees and not worry about transferring elements across.

In [None]:
import re

In [None]:
def to_tree(tree):
    def simplify(var):
        return re.sub(r'\[.+\]', '', var)
    taint_start, taint_stop = tree[1].taint[0], tree[1].taint[-1]

    new_children = []
    prev_child = None
    for child in sorted(tree[2], key=lambda c: c[1].taint[0]):
        if prev_child:
            assert prev_child[1].taint[0] < child[1].taint[0]
        prev_child = child
        child_t_start, child_t_stop = child[1].taint[0], child[1].taint[-1]
        if child_t_start > taint_start:
            elt = ''.join(tree[1].x(slice(taint_start, child_t_start)))
            new_children.append(("<%s>" % repr(elt), [(repr(elt), [])]))
        new_children.append(to_tree(child))
        taint_start = child_t_stop + 1
    if taint_start < taint_stop+1:
        elt = ''.join(tree[1].x(slice(taint_start, taint_stop + 1)))
        new_children.append((elt, []))
    return ("<%s>" % simplify(tree[0]), new_children)

In [None]:
restrict = {'methods':['process_inventory', 'process_vehicle', 'process_car', 'process_van']}
with TaintedTracer(INVENTORY, restrict) as tracer:
    process_inventory(tracer())
sm = TaintedTracker(tracer.inputstr, tracer.trace, track_return=True)
dt = TaintedMiner(tracer.inputstr, sm.the_vars.defs)
display_tree(to_tree(dt.tree))

In [None]:
import string

In [None]:
def parse_num(s,i):
    n = ''
    while s[i:] and s[i].in_(list(string.digits)):
        n += s[i]
        i = i +1
    return i,n

def parse_paren(s, i):
    assert s[i] == '('
    i, v = parse_expr(s, i+1)
    if s[i:] == '':
        raise Exception(s, i)
    assert s[i] == ')'
    return i+1, v


def parse_expr(s, i = 0):
    expr = []
    while s[i:]:
        c = s[i]
        if c.in_(list(string.digits)):
            i,num = parse_num(s,i)
            expr.append(num)
        elif c.in_(['+', '-', '*', '/']):
            expr.append(c)
            i = i + 1
        elif c == '(':
            i, cexpr = parse_paren(s, i)
            expr.append(cexpr)
        elif c == ')':
            return i, expr
        else:
            raise Exception(s,i)
    return i, expr

In [None]:
EXPRS = [
    '12+23',
    '(25+1)+100+(33+2)+1',
    '(25-1/(2+3))*100/3'
]

In [None]:
parse_expr(xtstr(EXPRS[1]).with_comparisons([]))

In [None]:
def to_tree(tree):
    key, val, children = tree
    taint_start, taint_stop = val.taint[0], val.taint[-1]

    new_children = []
    for child in sorted(children, key=lambda c: c[1].taint[0]):
        ckey, cval, cchildren = child
        child_t_start, child_t_stop = cval.taint[0], cval.taint[-1]
        # if child taint starts after the root taint, then we have a child
        # missing with index from taint_start to child_taint_start(non-incl)
        if child_t_start > taint_start:
            elt = ''.join(val.x(slice(taint_start, child_t_start)))
            new_children.append(("<%s>" % repr(elt), [(repr(elt), [])]))
        # and we append child as before.
        new_children.append(to_tree(child))
        # the new taint_start begins next to the taint stop.
        taint_start = child_t_stop + 1
    if taint_start < taint_stop + 1:
        elt = ''.join(val.x(slice(taint_start, taint_stop + 1)))
        new_children.append((elt, []))
    return ("<%s>" % key, new_children)

In [None]:
print(EXPRS[1])
restrict = {'methods':['parse_expr', 'parse_paren', 'parse_num']}
with TaintedTracer(EXPRS[1], restrict) as tracer:
    parse_expr(tracer())
sm = TaintedTracker(tracer.inputstr, tracer.trace)
dt = TaintedMiner(tracer.inputstr, sm.the_vars.defs)
display_tree(to_tree(dt.tree))

In [None]:
import pprint
import json
pp = pprint.PrettyPrinter(indent=4)

In [None]:
def parse_trees(trace, inputstr):
    # name,stack,children,idxs
    def new_node(s, stack, mid=None, indexes=None):
        n = {
            'sym': '<%s>' % s,
            'id': mid,
            'stack': stack,
            'indexes':  [],#[inputstr[i] for i in indexes] if indexes is not None else [],
            'children': []
        }
        for i in (indexes or []):
            n_addindex(n, i)
        return n

    root = new_node('start', stack=0, mid=0)
    method_stack = [root]

    def n_addchild(node, n): node['children'].append(n)
    def n_mid(node):      return node['id']
    def n_sym(node):      return node['sym']
    def n_addindex(node, i):
        c = inputstr[i]
        #node['indexes'].append(c)
        if node['children']:
            last_child = node['children'][-1]
            if last_child.get('id') is None: # last child an index. Hence, append
                last_child['indexes'].append(c)
                last_child['sym'] = ''.join(last_child['indexes'])
            else: # last child a method. Hence, start a new node
                n_addchild(node, {'sym': c, 'indexes': [c]})
        else: #no last_child. Start a new node
            n_addchild(node, {'sym': c, 'indexes': [c]})

    print(inputstr)
    prev_idx = None
    prev_node = None
    last_cmp_only = []
    for idx, instr, (method_name, stack_len, mid) in trace:
        if idx == prev_idx:
            prev_node = (idx, method_name, stack_len, mid)
        else:
            if prev_node:
                last_cmp_only.append(prev_node)
            prev_node = (idx, method_name, stack_len, mid)
            prev_idx = idx
    last_cmp_only.append(prev_node)
    for idx, method_name, stack_len, mid in last_cmp_only:
        print("%2d" % idx, " ", inputstr[idx], '\t|' * stack_len, method_name,
              mid, "(%d)" % stack_len)
    print()
    for idx, method_name, stack_len, mid in last_cmp_only:#[:4]:
        last_idx = len(method_stack) -1
        print(inputstr)
        print("%2d" % idx, " chr:%s" % inputstr[idx], '\t|' * stack_len, "\t",
              method_name, mid, "(%d)" % stack_len)
        if stack_len > last_idx:
            print('###### append parent')
            # first, generate a chain of the given stack length to the parent.
            for i in range(len(method_stack), stack_len):
                node = new_node('*',stack=i, mid=None)
                n_addchild(method_stack[-1],node)
                method_stack.append(node)

            assert len(method_stack) == stack_len
            # then for the given parent, add a child
            node = new_node(method_name, stack=stack_len, mid=mid, indexes=[idx])
            current = method_stack[-1]
            n_addchild(current,node)
            method_stack.append(node)
            print("ROOT:")
            print(json.dumps(root, indent=2))
            print('STACK:', ["%d:%s" % (i['stack'], i['sym']) for i in method_stack])
        elif stack_len == last_idx:
            current = method_stack[-1]
            if n_mid(current) == mid:
                print('###### insert char')
                assert n_sym(current) == "<%s>" % method_name
                current = method_stack[-1]
                n_addindex(current, idx)
            else:
                print('###### start new peer node')
                # a peer of this node. So get parent
                method_stack.pop()
                current = method_stack[-1]
                node = new_node(method_name, stack=stack_len, mid=mid, indexes=[idx])
                n_addchild(current,node)
            print("ROOT:")
            print(json.dumps(root, indent=2))
            print('STACK:', ["%d:%s" % (i['stack'], i['sym']) for i in method_stack])
        elif stack_len < last_idx:
            print('###### pop')
            current = method_stack[-1]
            print("-", current)
            while stack_len < last_idx:
                print('<-')
                method_stack.pop()
                last_idx = len(method_stack)-1
            assert stack_len == last_idx
            current = method_stack[-1]
            
            print('###### insert char')
            if n_mid(current) is None:
                current['id'] = mid
                current['sym'] = "<%s>" % method_name
            else:
                assert n_mid(current) == mid, "%s != %s" %(n_mid(current), mid)
                assert n_sym(current) == "<%s>" % method_name
            current = method_stack[-1]
            n_addindex(current, idx)
            
            print("ROOT:")
            print(json.dumps(root, indent=2))
            print('STACK:', ["%d:%s" % (i['stack'], i['sym']) for i in method_stack])
            
    print()
    print("last:", root)
    return root


In [None]:
my_root = parse_trees(tracer.inputstr.comparisons, str(tracer.inputstr))

In [None]:
def to_tree(node, my_str):
    sym = node['sym']
    children = node.get('children')
    if children:
        return (sym, [to_tree(c, my_str) for c in children])
    else:
        return (sym, [])

In [None]:
to_tree(my_root, str(tracer.inputstr))

In [None]:
print(tracer.inputstr)
display_tree(to_tree(my_root, str(tracer.inputstr)))

## Lessons Learned

* Given a set of inputs, we can learn an input grammar by examining variable values during execution.
* The resulting grammars can be used right during fuzzing.
* TODO: make the point that our initial implementation is about learning regular grammar not CFG because we do not know how to handle mutually recursive and looping procedures
* TODO: Use process_vehicle as a pervading example.
* TODO: Mention that control flow dependencies is not tracked in dynamic taints. But it is tracked in simple miner with string inclusion.

## Next Steps

_Link to subsequent chapters (notebooks) here, as in:_

* [use _mutations_ on existing inputs to get more valid inputs](MutationFuzzer.ipynb)
* [use _grammars_ (i.e., a specification of the input format) to get even more valid inputs](Grammars.ipynb)
* [reduce _failing inputs_ for efficient debugging](Reducer.ipynb)


## Background

\cite{Lin2008}

## Exercises

_Close the chapter with a few exercises such that people have things to do.  To make the solutions hidden (to be revealed by the user), have them start with_

```markdown
**Solution.**
```

_Your solution can then extend up to the next title (i.e., any markdown cell starting with `#`)._

_Running `make metadata` will automatically add metadata to the cells such that the cells will be hidden by default, and can be uncovered by the user.  The button will be introduced above the solution._

### Exercise 1: _Title_

_Text of the exercise_

In [None]:
# Some code that is part of the exercise

_Some more text for the exercise_

**Solution.** _Some text for the solution_

In [None]:
# Some code for the solution
2 + 2

_Some more text for the solution_

### Exercise 2: _Title_

_Text of the exercise_

**Solution.** _Solution for the exercise_