## Evaluating ConcolicGrammarFuzzer

In this chapter, we will evaluate our concolic fuzzer that uses grammars. Namely, we want to compare  `ConcolicGrammarFuzzer` and `GrammarFuzzer`. We are mainly interested in **speed**, **line coverage**, **branch coverage** and **number of valid inputs** achieved by these fuzzers.

We will fuzz a simple Python program with both fuzzers and compare their success. Note that the features of `ConcolicGrammarFuzzer` is limited by symbolic execution and the SMT solver used ([Z3](https://github.com/Z3Prover/z3)), so we restricted our target to be a simple program. We believe this evaluation could be extended to more complex programs (which would lead to amore comprehensive evaluation of our concolic fuzzer) when more features are added to `ConcolicGrammarFuzzer` or Z3.

In [None]:
import fuzzingbook_utils

In [None]:
from ConcolicFuzzer import ConcolicGrammarFuzzer, ConcolicTracer
from GrammarFuzzer import GrammarFuzzer

## Target Program: `query_db`

Our target program `query_db`, which is defined as below. This simple function mimics querying an SQL databese. We mock an SQL database using the `ConcolicDB` and its grammar `INVENTORY_GRAMMAR` from the [InformationFlow](InformationFlow.ipynb) chapter.

We should point some important aspects of `query_db`:
* The databese is initialized to `INITIAL_DB` in each execution. This is done mainly because we want the database to be in the same state for each query created by the fuzzers so that our results can be reproduced easily. Later on, we will also test our fuzzers with different `INITIAL_DB` values.


* If the query is not valid, our target throws an `SQLException`. For example, if the table targeted by query doesn't exist in the database, we would get an `SQLException`. This makes the job of fuzzers harder because they need to come up with queries which have specific table or field names. Also, note that because of this, we expect the `ConcolicGrammarFuzzer` to have more success since it should be able to find out these specific values with symbolic execution and update its grammar to produce better queries.

In [None]:
from InformationFlow import INVENTORY_GRAMMAR, SQLException
from ConcolicFuzzer import ConcolicDB
from copy import deepcopy

INITIAL_DB = [
    (
        'vehicles',
        (
            {'year': int, 'kind': str, 'company': str, 'model': str},
            [
                {'year': 1997, 'kind': 'van', 'company': 'Ford', 'model': 'E350'},
                {'year': 2000, 'kind': 'car', 'company': 'Mercury', 'model': 'Cougar'},
                {'year': 1999, 'kind': 'car', 'company': 'Chevy', 'model': 'Venture'}
            ]
        )
    ),
    (
        'months',
        (
            {'month': int, 'name': str},
            [
                {'month': 1, 'name': 'jan'},
                {'month': 2, 'name': 'feb'},
                {'month': 3, 'name': 'mar'},
                {'month': 4, 'name': 'apr'},
                {'month': 5, 'name': 'may'},
                {'month': 6, 'name': 'jun'},
                {'month': 7, 'name': 'jul'},
                {'month': 8, 'name': 'aug'},
                {'month': 9, 'name': 'sep'},
                {'month': 10, 'name': 'oct'},
                {'month': 11, 'name': 'nov'},
                {'month': 12, 'name': 'dec'}
            ]
        )
    )
]

def query_db(query):
    global INITIAL_DB
    concolic_db = ConcolicDB()
    concolic_db.db = deepcopy(INITIAL_DB)
    return concolic_db.sql(query)

## Calculating Coverage


In [None]:
import sys

class FilteredCoverage:
    def __init__(self, functions=set()):
        self._traces = []
        self._line_coverage = set()
        self._branch_coverage = set()
        self._functions = functions
    
    def __enter__(self):
        self.original_tracer = sys.gettrace()
        sys.settrace(self._tracer)
        return self

    def __exit__(self, exc_type, exc_value, tb):
        sys.settrace(self.original_tracer)
        
    def _tracer(self, frame, event, arg):
        if self.original_tracer is not None:
            self.original_tracer(frame, event, arg)
            
        should_trace = (event == 'line' and 
                        frame.f_code.co_name in self._functions)
        
        if should_trace:
            function_name = frame.f_code.co_name
            line_no = frame.f_lineno
            trace = (function_name, line_no)
            
            if len(self._traces) > 0:
                last_trace = self._traces[-1]
                if last_trace[0] == function_name:
                    self._branch_coverage.add((last_trace, trace))
                
            self._traces.append(trace)
            self._line_coverage.add(trace)

        return self._tracer
    
    def get_traces(self):
        return self._traces

    def get_line_coverage(self):
        return self._line_coverage
    
    def get_branch_coverage(self):
        return self._branch_coverage

In [None]:
from collections import defaultdict

class Result:
    def __init__(
            self, 
            functions,
            fuzzer_name,
            line_coverage, 
            branch_coverage, 
            exceptions, 
            valid_inputs,
            elapsed_time):
        
        self.functions = functions
        self.fuzzer_name = fuzzer_name
        
        self.line_coverage = line_coverage
        self.branch_coverage = branch_coverage
        self.exceptions = exceptions
        self.valid_inputs = valid_inputs
        
        self.n_lines_covered = len(line_coverage)
        self.n_branches_covered = len(branch_coverage)
        self.n_exceptions = len(exceptions)
        self.n_valid_inputs = len(valid_inputs)
        
        self.elapsed_time = elapsed_time
        
        self.line_coverage_dict = self._generate_line_coverage_dict()
        self.branch_coverage_dict = self._generate_branch_coverage_dict()
        self.exceptions_dict = self._generate_exceptions_dict()

    def _generate_line_coverage_dict(self):
        line_coverage_dict = defaultdict(list)
        for function_name, line in self.line_coverage:
            line_coverage_dict[function_name].append(line)
        return line_coverage_dict

    def _generate_branch_coverage_dict(self):
        branch_coverage_dict = defaultdict(list)
        for branch in self.branch_coverage:
            function_name = branch[0][0]
            branch_coverage_dict[function_name].append(branch)
        return branch_coverage_dict
            
    def _generate_exceptions_dict(self):
        exceptions_dict = defaultdict(list)
        for e in self.exceptions:
            e_type = type(e).__name__
            exceptions_dict[e_type].append(e)
        return exceptions_dict

In [None]:
import matplotlib.pyplot as plt
import numpy as np

class ResultComparator:
    def __init__(self, result1, result2):
        self._result1 = result1
        self._result2 = result2
        
        self._name1 = result1.fuzzer_name
        self._name2 = result2.fuzzer_name
        
        assert(result1.functions == result2.functions)
        self._functions = result1.functions
 
    def _print_formatted(self, arg1, arg2, arg3):
        print('{0: <30} {1: <25} {2: <25}'.format(arg1, arg2, arg3))
        
    def _print_title(self, title):
        print('\n')
        self._print_formatted(title, self._name1, self._name2)
        print('-' * 80)
        
    def _print_coverage_comparison(self, cov_dict1, n1, cov_dict2, n2, title):
        self._print_title(title)
        self._print_formatted('all functions', n1, n2)
        for f in self._functions:
            n_lines1 = len(cov_dict1[f])
            n_lines2 = len(cov_dict2[f])
            self._print_formatted(f, n_lines1, n_lines2)
    
    def _plot_coverage_comparison(self, cov_dict1, n1, cov_dict2, n2, title):
        x_indexes = [i for i in range(len(self._functions) + 1)]
        x_labels = ['all functions'] + list(self._functions)

        plt.title(title)
        plt.bar([x-0.1 for x in x_indexes], 
                [n1] + [len(cov_dict1[f]) for f in self._functions],  
                width=0.2, align='center', label=self._name1)

        plt.bar([x+0.1 for x in x_indexes], 
                [n2] + [len(cov_dict2[f]) for f in self._functions],  
                width=0.2, align='center', label=self._name2)

        plt.legend(loc='upper right')
        plt.xticks(x_indexes, x_labels, rotation=20)
        plt.show()
        
    def compare_line_coverage(self):
        args = [
            self._result1.line_coverage_dict,
            self._result1.n_lines_covered,
            self._result2.line_coverage_dict,
            self._result2.n_lines_covered,
            'Line Coverage'
        ]
        
        self._print_coverage_comparison(*args)
        self._plot_coverage_comparison(*args)
    
    def compare_branch_coverage(self):
        args = [
            self._result1.branch_coverage_dict,
            self._result1.n_branches_covered,
            self._result2.branch_coverage_dict,
            self._result2.n_branches_covered,
            'Branch Coverage'
        ]
        
        self._print_coverage_comparison(*args)
        self._plot_coverage_comparison(*args)
    
    def compare_exceptions(self):
        e_dict1 = self._result1.exceptions_dict
        e_dict2 = self._result2.exceptions_dict
        all_e_types = e_dict1.keys() | e_dict2.keys()

        self._print_title('Exception Type')
    
        for e_type in all_e_types:
            n1 = len(e_dict1[e_type])
            n2 = len(e_dict2[e_type])
            self._print_formatted(e_type, n1, n2)

    def compare_valid_inputs(self):
        n1 = self._result1.n_valid_inputs
        n2 = self._result2.n_valid_inputs
        
        self._print_title('Number of Valid Inputs')
        self._print_formatted('', n1, n2)
    
    def compare_elapsed_time(self):
        t1 = round(self._result1.elapsed_time, 2)
        t2 = round(self._result2.elapsed_time, 2)
        
        self._print_title('Elapsed Time (s)')
        self._print_formatted('', t1, t2)
        
    def compare_all(self):
        self.compare_line_coverage()
        self.compare_branch_coverage()
        self.compare_exceptions()
        self.compare_valid_inputs()
        self.compare_elapsed_time()

## Runners

In [None]:
import time

In [None]:
def fuzz_n_times(target, functions, fuzzer, n):
    fuzzer_name = fuzzer.__class__.__name__
    
    def run_target(inp):
        if 'concolic' in fuzzer_name.lower():
            with ConcolicTracer() as tracer:
                try:
                    return tracer[target](inp)
                except Exception as e:
                    raise e
                finally:
                    fuzzer.update_grammar(tracer)
        else:
            return target(inp)
    
    line_coverage = set()
    branch_coverage = set()
    exceptions = []
    valid_inputs = []
    
    start_time = time.time()
    
    for i in range(n):
        print('{0:<30}-> fuzz {1} of {2}'.format(fuzzer_name, i, n), end='\r')
        inp = fuzzer.fuzz()
        with FilteredCoverage(functions) as fc:
            try:
                run_target(inp)
                valid_inputs.append(inp)
            except Exception as e:
                exceptions.append(e)

        line_coverage |= fc.get_line_coverage()
        branch_coverage |= fc.get_branch_coverage()
        
    elapsed_time = time.time() - start_time

    print('{0:<30}-> fuzzed {1} times in {2:.2f}s !'.format(fuzzer_name, n, elapsed_time))
    
    return Result(functions,
                  fuzzer_name,
                  line_coverage, 
                  branch_coverage, 
                  exceptions,
                  valid_inputs,
                  elapsed_time)

In [None]:
def run_grammar_fuzzer(
        target, 
        grammar, 
        max_nonterminals,
        n_fuzz,
        functions): 
    
    gf = GrammarFuzzer(grammar=grammar, 
                       max_nonterminals=max_nonterminals)
    
    result = fuzz_n_times(target=target, 
                          functions=functions, 
                          fuzzer=gf, 
                          n=n_fuzz)
    
    return result

In [None]:
def run_concolic_grammar_fuzzer(
        target, 
        grammar, 
        max_nonterminals, 
        n_fuzz,
        functions):
    
    cgf = ConcolicGrammarFuzzer(grammar=grammar,
                                max_nonterminals=max_nonterminals)

    #tokens_to_prune = ['<value>', '<table>', '<column>', 
    #                   '<literals>', '<exprs>', '<bexpr>'] 
    
    tokens_to_prune = [] 
    
    cgf.prune_tokens(tokens_to_prune)
    
    result = fuzz_n_times(target=target, 
                          functions=functions, 
                          fuzzer=cgf, 
                          n=n_fuzz)
    
    return result

## Fuzzing Configuration

In [None]:
N_FUZZ_DEFAULT = 10
TARGET_DEFAULT = query_db
GRAMMAR_DEFAULT = INVENTORY_GRAMMAR
MAX_NONTERMINALS_DEFAULT = 10
FUNCTIONS_DEFAULT = {
    'sql', 
    'table',
    'do_update', 
    'do_insert', 
    'do_delete', 
    'query_db'
}

# Evaluation

## Comparative Run

In [None]:
def comperative_run(runner1, config1, runner2, config2):
    result1 = runner1(**config1)
    result2 = runner2(**config2)
    return result1, result2

In [None]:
def compare_results(result1, result2):
    result_comparator = ResultComparator(result1, result2)    
    result_comparator.compare_all()

In [None]:
config = {
    'target': TARGET_DEFAULT, 
    'grammar': GRAMMAR_DEFAULT, 
    'max_nonterminals': MAX_NONTERMINALS_DEFAULT, 
    'n_fuzz': 1000,
    'functions': FUNCTIONS_DEFAULT
}

result1, result2 = comperative_run(
    runner1=run_concolic_grammar_fuzzer,
    config1=config,
    runner2=run_grammar_fuzzer,
    config2=config
)

compare_results(result1, result2)

In [None]:
config = {
    'target': TARGET_DEFAULT, 
    'grammar': GRAMMAR_DEFAULT, 
    'max_nonterminals': MAX_NONTERMINALS_DEFAULT, 
    'n_fuzz': 1000,
    'functions': FUNCTIONS_DEFAULT
}

result1, result2 = comperative_run(
    runner1=run_concolic_grammar_fuzzer,
    config1=config,
    runner2=run_grammar_fuzzer,
    config2=config
)

compare_results(result1, result2)

## under construction

In [None]:
from urllib.parse import urlparse
from Grammars import URL_GRAMMAR

config = {
    'target': urlparse, 
    'grammar': URL_GRAMMAR, 
    'max_nonterminals': MAX_NONTERMINALS_DEFAULT, 
    'n_fuzz': 100,
    'functions': {
        'urlparse',
         '_splitnetloc',
         'urlsplit',
         '_checknetloc',
         '_noop',
    }
}

result1, result2 = comperative_run(
    runner1=run_concolic_grammar_fuzzer,
    config1=config,
    runner2=run_grammar_fuzzer,
    config2=config
)

compare_results(result1, result2)

In [None]:
def sum_results(results):
    assert (len(results) > 1)
    
    functions = sorted(results[0].functions)
    fuzzer_name = results[0].fuzzer_name
    line_coverage = set()
    branch_coverage = set()
    exceptions = []
    elapsed_time = 0
    
    assert (all(sorted(r.functions) == functions for r in results))
    assert (all(r.fuzzer_name == fuzzer_name for r in results))
    
    for result in results:
        line_coverage |= result.line_coverage
        branch_coverage |= result.branch_coverage
        exceptions += result.exceptions
        elapsed_time += result.elapsed_time
        
    return Result(functions,
                  fuzzer_name,
                  line_coverage, 
                  branch_coverage, 
                  exceptions, 
                  elapsed_time)

In [None]:
def run_fuzzer(fuzzer_class, fuzzer_config, target, functions, n_fuzz):
    fuzzer = fuzzer_class(**fuzzer_config)  
    fuzzer_name = fuzzer.__class__.__name__
    
    if 'concolic' in fuzzer_name.lower():
        tokens_to_prune = ['<value>', '<table>', '<column>', 
                           '<literals>', '<exprs>', '<bexpr>'] 
        fuzzer.prune_tokens(tokens_to_prune)
    else:
        assert ('grammar' in fuzzer_name.lower())
    
    result = fuzz_n_times(target=target, 
                          functions=functions, 
                          fuzzer=fuzzer, 
                          n=n_fuzz)
    
    return result