# Mining Input Grammars

So far, the grammars we have seen have been mostly specified manually – that is, you (or the person knowing the input format) had to design and write a grammar in the first place.  While the grammars we have seen so far have been rather simple, creating a grammar for complex inoputs can involve quite some effort.  In this chapter, we therefore introduce techniques that automatically _mine_ grammars from programs – by executing the programs and observing how they process which parts of the input.  In conjunction with a grammar fuzzer, this allows us to (1) take a program, (2) extract its input grammar, and (3) fuzz it with high efficiency and effectiveness.

**Prerequisites**

* You should have read the [chapter on grammars](Grammars.ipynb).
* The [chapter on configuration fuzzing](ConfigurationFuzzer.ipynb) introduces grammar mining for configuration options, as well as observing variables and values during execution.

In [None]:
import fuzzingbook_utils

In [None]:
import sys

In [None]:
def logger(indent, var, log):
    if log:
        print('\t' * indent, var)

## A Simple Grammar Miner

Say we want to obtain the grammar for the function `urlparse` from the *Python* distribution.

### Function Under Test

In [None]:
from urllib.parse import urlparse, clear_cache
FUNCTION = urlparse

### Recording Occurrence of Input Values.

We have a few inputs that can be used, as listed below:

We use two *global* variables -- `the_values` is used to keep track of variable assignments and `the_input` to keep track of the current input string. We will show later how to avoid these globals.

In [None]:
INPUTS = [
    'http://user:pass@www.google.com:80/?q=path#ref',
    'https://www.cispa.saarland:80/',
    'http://www.fuzzingbook.org/#News',
]

#### Get qualified name of a variable

In [None]:
class Context:
    def __init__(self, frame, track_caller=True):
        self.method = self._method(frame)
        #self.class_name = self._class_name(frame)
        self.parameter_names = self._get_parameters(frame)
        self.file_name = self._file_name(frame)
        self.parent = Context(frame.f_back,
                              False) if track_caller and frame.f_back else None

    def _class_name(self, frame):
        class_name = frame.f_code.co_name
        if frame.f_code.co_name == '__new__':
            class_name = frame.f_locals[frame.f_code.co_varnames[0]].__name__
        return class_name

    def _get_parameters(self, frame):
        return [
            frame.f_code.co_varnames[i]
            for i in range(frame.f_code.co_argcount)
        ]

    def _file_name(self, frame):
        return frame.f_code.co_filename
    
    def _method(self, frame):
        return frame.f_code.co_name

    def all_vars(self, frame):
        return frame.f_locals

The function `traceit()` is used to record all *non trivial* string variables (with length more than 2 characters) and values occurring during execution.

In [None]:
class Tracer:
    def __init__(self, inputstr):
        self.inputstr, self.trace = inputstr, []

    def __enter__(self):
        self.oldtrace = sys.gettrace()
        sys.settrace(self.traceit)
        return self

    def __exit__(self, *args):
        sys.settrace(self.oldtrace)

    def include(self, k, v):
        return isinstance(v, str)

    def traceit(self, frame, event, arg):
        cxt = Context(frame)
        my_vars = [(k, v) for k, v in cxt.all_vars(frame).items()
                   if self.include(k, v)]
        self.trace.append((event, arg, cxt, my_vars))
        return self.traceit

    def __call__(self):
        return self.inputstr

In [None]:
class Tracker:
    def __init__(self, inputstr, trace, **kwargs):
        self.the_vars = {}
        self.trace = trace
        self.inputstr = inputstr
        self.options(kwargs)
        self.process()
        
    def options(self, kwargs):
        pass

    def include(self, var, value):
        return len(value) > 2 and value in self.inputstr

    def trace_event(self, event, arg, ctx, my_vars):
        self.the_vars.update({k: v for k, v in my_vars if self.include(k, v)})

    def process(self):
        for event, arg, cxt, my_vars in self.trace:
            self.trace_event(event, arg, cxt, my_vars)

### Trace

The `trace_function()` hooks into the Python trace functionality.

In [None]:
clear_cache()
with Tracer(INPUTS[0]) as tracer:
    FUNCTION(tracer())

tracker = Tracker(tracer.inputstr, tracer.trace)
for k,v in tracker.the_vars.items():
    print(k, '=', repr(v))

### Extracting a Derivation Tree

In [None]:
from Grammars import START_SYMBOL, syntax_diagram

Convert a variable name into a grammar nonterminal

In [None]:
def nonterminal(var):
    return "<" + var.lower() + ">"

Now, for each pair _VAR_, _VALUE_ found:

1. We search for occurrences of _VALUE_ in the grammar
2. We replace them by <_VAR_>
3. We add a new rule <_VAR_> $\rightarrow$ <_VALUE_> to the grammar

In [None]:
def get_derivation_tree(my_input, my_assignments, log=False):
    # Here's our initial tree
    tree = {START_SYMBOL: (my_input, )}
    my_assignments = my_assignments.copy()

    # Replace as listed above
    while True:
        new_rules = []
        for var, value in my_assignments.items():
            logger(0, "%s = %s" % (var, value), log)
            for key, repl in tree.items():
                logger(1, "%s : %s" % (key, repl), log)
                if not any(value in t for t in repl):
                    continue
                alt_key = nonterminal(var)
                new_arr = []
                for k, token in enumerate(repl):
                    if not value in token:
                        new_arr.append(token)
                    else:
                        # Replace value by nonterminal name
                        arr = token.split(value)
                        new_arr.extend(
                            list(sum(zip(arr,
                                         len(arr) * [alt_key]), ()))[:-1])
                tree[key] = tuple(i for i in new_arr if i)
                new_rules.append((var, alt_key, value))

        if not new_rules:
            break  # Nothing to expand anymore

        for (var, alt_key, value) in new_rules:
            # Add new rule to tree
            tree[alt_key] = (value, )
            logger(0, "+%s = %s" % (alt_key, value), log)

            # Do not expand this again
            del my_assignments[var]

    return {key: values for key, values in tree.items()}

First, trace the execution:

In [None]:
clear_cache()
with Tracer(INPUTS[0]) as tracer:
    FUNCTION(tracer())

In [None]:
assignments = Tracker(tracer.inputstr, tracer.trace).the_vars
for var, val in tracker.the_vars.items():
    print(var + " = " + repr(val))

In [None]:
dt0 = get_derivation_tree(tracer.inputstr, assignments)
for k, v in dt0.items():
    print(k, ' = ', v)

In [None]:
clear_cache()
with Tracer(INPUTS[1]) as tracer:
    FUNCTION(tracer())
dt1 = get_derivation_tree(tracer.inputstr,
                          Tracker(tracer.inputstr, tracer.trace).the_vars)
for k, v in dt1.items():
    print(k, ' = ', v)

In [None]:
clear_cache()
with Tracer(INPUTS[2]) as tracer:
    FUNCTION(tracer())
dt2 = get_derivation_tree(tracer.inputstr,
                          Tracker(tracer.inputstr, tracer.trace).the_vars)
for k, v in dt2.items():
    print(k, ' = ', v)

### Recovering Grammar from Derivation Trees

In [None]:
def to_grammar(t):
    return {k:[''.join(v)] for k,v in t.items()}

In [None]:
def add_tree(g, t):
    merged_grammar = {}
    for key in list(g.keys()) + list(t.keys()):
        alternates = set(g.get(key, []))
        if key in t:
            alternates.add(''.join(t[key]))
        merged_grammar[key] = list(alternates)
    return merged_grammar

In [None]:
add_tree(to_grammar(dt1), dt2)

In [None]:
def recover_grammar(traces):
    merged_grammar = {}
    for inputstr, trace in traces:
        tree = get_derivation_tree(inputstr, Tracker(inputstr, trace).the_vars)
        merged_grammar = add_tree(merged_grammar, tree)
    return merged_grammar

In [None]:
traces = []
for inputstr in INPUTS:
    clear_cache()
    with Tracer(inputstr) as tracer:
        FUNCTION(tracer())
    traces.append((tracer.inputstr, tracer.trace))

grammar = recover_grammar(traces)
for k,v in grammar.items():
    print(k, ':= ', "\n\t|".join([str(s) for s in v]))

In [None]:
syntax_diagram(grammar)

### Fuzzing

In [None]:
from GrammarFuzzer import GrammarFuzzer

In [None]:
f = GrammarFuzzer(grammar)
for i in range(10):
    print(f.fuzz())

## Grammar Miner with Stack

### Keep Track of The Stack

In [None]:
class InputStack(object):
    def __init__(self, i):
        self.original = i
        self.inputs = []

    def has(self, val):
        return any(val in var for var in self.inputs[-1].values())

    def ignored(self, val):
        return not (isinstance(val, str) and len(val) > 2)

    def include(self, k, val):
        if self.ignored(val):
            return False
        return self.has(val) if self.inputs else val in self.original

    def push(self, inputs):
        my_inputs = {k: v for k, v in inputs.items() if self.include(k, v)}
        self.inputs.append(my_inputs)

    def pop(self):
        return self.inputs.pop()

### Restrict The Input Window

We proxy the dictionary so that it will only update if it does not already contain a value.

In [None]:
class Vars(object):
    def __init__(self, i):
        self.defs = {START_SYMBOL: i}

    def update(self, v):
        self.defs.update({k: v for k, v in v.items() if k not in self.defs})

In [None]:
class StackTracker(Tracker):
    def __init__(self, inputstr, trace, **kwargs):
        self.istack = InputStack(inputstr)
        self.the_vars = Vars(inputstr)
        self.trace = trace
        self.options(kwargs)
        self.process()
        
    def options(self, kwargs):
        self.files = kwargs.get('files') or []
        self.track_params = kwargs.get('track_params') or True
        self.track_vars = kwargs.get('track_vars') or True
        self.track_return = kwargs.get('track_return') or False

    def include(self, var, value):
        if self.istack.ignored(value):
            return False
        return self.istack.include(var, value)

    def get_params(self, cxt, all_vars):
        return {
            "%s:%s" % (cxt.method, k): v
            for k, v in all_vars if k in cxt.parameter_names
        }

    def trace_event(self, event, arg, cxt, my_vars):
        if not any(cxt.file_name.endswith(f) for f in self.files):
            return
        if event == 'call':
            my_parameters = {
                k: v
                for k, v in self.get_params(cxt, my_vars).items()
                if not self.istack.ignored(v)
            }
            self.istack.push(my_parameters)
            if self.track_params:
                self.the_vars.update(my_parameters)
            return

        if event == 'return':
            self.istack.pop()
            if self.track_return:
                var = '(<-%s)' % cxt.method
                self.the_vars.update_vars({var: arg})
            return

        if event == 'exception':
            return

        if self.track_vars:
            qvars = {"%s:%s" % (cxt.method, k): v for k, v in my_vars}
            my_vars = {
                var: value
                for var, value in qvars.items() if self.include(var, value)
            }
            if not self.track_params:
                my_vars = {
                    var: value
                    for var, value in my_vas.items() if var not in param_names
                }
            self.the_vars.update(my_vars)

We need to modify `traceit()` to be aware of events now:

In [None]:
traces = []
for inputstr in INPUTS:
    clear_cache()
    with Tracer(inputstr) as tracer:
        FUNCTION(tracer())
    sm = StackTracker(tracer.inputstr, tracer.trace, files=['urllib/parse.py'])
    traces.append((tracer.inputstr, sm))

Note that in the following we do not account for parameters getting reassigned values.

For each (VAR, VALUE) found:
* We search for occurrences of VALUE in the grammar
* We replace them by VAR
* We add a new rule VAR -> VALUE to the grammar

In [None]:
def get_derivation_tree(my_input, my_assignments, log=False):
    my_assignments = my_assignments.copy()
    tree = {}
    for var, value in my_assignments.items():
        nt_var = var if var == START_SYMBOL else nonterminal(var)
        logger(0, "%s = %s" % (nt_var, value), log)
        if tree:
            append = False
            for key, repl in tree.items():
                logger(1, "%s : %s" % (key, repl), log)
                if not any(value in t for t in repl):
                    continue
                new_arr = []
                for k, token in enumerate(repl):
                    if not value in token:
                        new_arr.append(token)
                    else:
                        append = True
                        arr = token.split(value)
                        new_arr.extend(
                            list(sum(zip(arr,
                                         len(arr) * [nt_var]), ()))[:-1])
                tree[key] = tuple(i for i in new_arr if i)
            if append:
                logger(0, "+%s = %s" % (nt_var, value), log)
                tree[nt_var] = set([value])
        else:
            tree[nt_var] = (value, )
    return  {key: values for key, values in tree.items()}

In [None]:
def recover_grammar(traces):
    merged_grammar = {}
    for inputstr, assignments in traces:
        tree = get_derivation_tree(inputstr, assignments)
        merged_grammar = add_tree(merged_grammar, tree)
    return merged_grammar

In [None]:
clear_cache()
with Tracer(INPUTS[2]) as tracer:
    FUNCTION(tracer())
sm = StackTracker(tracer.inputstr, tracer.trace, files=['urllib/parse.py'])
tree = get_derivation_tree(tracer.inputstr, sm.the_vars.defs)
for k, v in tree.items():
    print(k, v)

In [None]:
traces = []
for inputstr in INPUTS:
    clear_cache()
    with Tracer(inputstr) as tracer:
        FUNCTION(tracer())
    sm = StackTracker(tracer.inputstr, tracer.trace, files=['urllib/parse.py'])
    traces.append((tracer.inputstr, sm.the_vars.defs))
grammar = recover_grammar(traces)
for k,v in grammar.items():
    print(k, v)

In [None]:
syntax_diagram(grammar)

## Tainted Grammar Miner

In [None]:
from InformationFlow import tstr

In [None]:
class TaintedTracer(Tracer):
    def __init__(self, inputstr):
        self.inputstr = tstr(inputstr, parent=None)
        self.trace = []
        self.istack = TaintedInputStack(inputstr)
        self.vars = TaintedVars(inputstr)
  
    def include(self, k, v):
        return isinstance(repr(v), tstr)

In [None]:
class TaintedInputStack(InputStack):
    def has(self, val):
        return any(val.taint_in(var) for var in self.inputs[-1].values())
    
    def ignored(self, val):
        return not isinstance(repr(val), tstr)
    
    def include(self, k, val):
        if self.ignored(val):
            return False
        return self.has(val) if self.inputs else val.taint_in(self.original)

In [None]:
class TaintedVars(Vars):
    def trep(self, v):
        return v if isinstance(v, tstr) else repr(v)

    def update(self, v):
        self.defs.update(
            {k: self.trep(v)
             for k, v in v.items() if k not in self.defs})

In [None]:
class TaintedTracker(StackTracker):
    def __init__(self, inputstr, trace, **kwargs):
        self.istack = TaintedInputStack(inputstr)
        self.the_vars = TaintedVars(inputstr)
        self.trace = trace
        self.options(kwargs)
        self.process()

We can only replace a value if the taints match.

In [None]:
def get_derivation_tree(my_input, my_assignments, log=False):
    my_assignments = my_assignments.copy()
    tree = {}
    for var, value in my_assignments.items():
        nt_var = var if var == START_SYMBOL else nonterminal(var)
        logger(0, "%s = %s" % (nt_var, value), log)
        if tree:
            append = False
            for key, repl in tree.items():
                logger(1, "%s : %s" % (key, repl), log)
                if not any(value.taint_in(t) for t in repl if isinstance(t, tstr)):
                    continue
                new_arr = []
                for k, token in enumerate(repl):
                    if not isinstance(token, tstr) or not value.taint_in(token):
                        new_arr.append(token)
                    else:
                        append = True
                        arr = token.split(value)
                        new_arr.extend(
                            list(sum(zip(arr,
                                         len(arr) * [nt_var]), ()))[:-1])
                tree[key] = tuple(i for i in new_arr if i)
            if append:
                logger(0, "+%s = %s" % (nt_var, value), log)
                tree[nt_var] = set([value])
        else:
            tree[nt_var] = (value, )
    return  {key: values for key, values in tree.items()}

In [None]:
def recover_grammar(traces):
    merged_grammar = {}
    for inputstr, assignments in traces:
        tree = get_derivation_tree(inputstr, assignments)
        merged_grammar = add_tree(merged_grammar, tree)
    return merged_grammar

In [None]:
traces = []
for inputstr in INPUTS:
    clear_cache()
    with TaintedTracer(inputstr) as tracer:
        FUNCTION(tracer())
    sm = TaintedTracker(tracer.inputstr, tracer.trace, files=['urllib/parse.py'])
    traces.append((tracer.inputstr, sm.the_vars.defs))
grammar = recover_grammar(traces)
syntax_diagram(grammar)

## Tainted Objects

While the functions we have seen so far uses string parameters to pass fragments of input around, real world parses often pass around data structures that represent the input fragments. For the standard data containers in Python, one can rely on rely on simple recursive filtering.

In [None]:
def taint_process(v):
    tv = type(v)
    if tv in {int, float, complex, str, bytes, bytearray}:
        return v
    elif tv in {set, frozenset, list, tuple, range}:
        return tv([taint_process(i) for i in v])
    elif tv in {dict}:  # or hasattr(v, '__dict__')
        return {i: taint_process(v[i]) for i in v}
    else:
        return repr(v)

One way to account for custom data structures other than containers is to rely on its `repr()`. That is, both `str()` and `repr()` relies on string methods that we have overridden in the tainted string. Hence if any of the string fragments are tainted, their return will also tainted.

In [None]:
class TaintedInputStack(TaintedInputStack):
    def has(self, val):
        return any(val.taint_in(var) for var in self.inputs[-1].values())

    def push(self, inputs):
        tainted = {
            k: repr(v)
            for k, v in inputs.items() if isinstance(repr(v), tstr)
        }
        if not self.inputs:
            my_inputs = tainted
        else:
            my_inputs = {k: v for k, v in tainted.items() if self.has(v)}
        self.inputs.append(my_inputs)

    def pop(self):
        return self.inputs.pop()

One of the choices here is whether to track the input parameters as variables (not just as input parameters) or only the local variable values.

In [None]:
class ConfigurableTracker(StackTracker):
    def __init__(self, inputstr):
        super().__init__(inputstr)
        self.istack = InputStack()
        self.vars = TaintedVars(inputstr)

### Accounting for reassignments

## Lessons Learned

* Given a set of inputs, we can learn an input grammar by examining variable values during execution.
* The resulting grammars can be used right during fuzzing.

## Next Steps

_Link to subsequent chapters (notebooks) here, as in:_

* [use _mutations_ on existing inputs to get more valid inputs](MutationFuzzer.ipynb)
* [use _grammars_ (i.e., a specification of the input format) to get even more valid inputs](Grammars.ipynb)
* [reduce _failing inputs_ for efficient debugging](Reducer.ipynb)


## Background

\cite{Lin2008}

## Exercises

_Close the chapter with a few exercises such that people have things to do.  To make the solutions hidden (to be revealed by the user), have them start with_

```markdown
**Solution.**
```

_Your solution can then extend up to the next title (i.e., any markdown cell starting with `#`)._

_Running `make metadata` will automatically add metadata to the cells such that the cells will be hidden by default, and can be uncovered by the user.  The button will be introduced above the solution._

### Exercise 1: _Title_

_Text of the exercise_

In [None]:
# Some code that is part of the exercise
pass

_Some more text for the exercise_

**Solution.** _Some text for the solution_

In [None]:
# Some code for the solution
2 + 2

_Some more text for the solution_

### Exercise 2: _Title_

_Text of the exercise_

**Solution.** _Solution for the exercise_