# Learning from Failures

Given the many executions we can generate, it is only natural that these executions would also be subject to _machine learning_ in order to learn which features of the input (or the execution) would be associated with failures.

In this chapter, we study the _Alhazen_ approach, one of the first of this kind.
Alhazen by Kampmann et al. [[KHSZ20](https://publications.cispa.saarland/3107/7/fse2020-alhazen.pdf)] automatically learn the associations between the failure of a program and _features of the input data_, say "The error occurs whenever the `<expr>` element is negative"


This chapter is based on an Alhazen implementation and exercise contributed by [Martin Eberlein](https://martineberlein.github.io), TU Berlin. Thanks a lot, Martin!

In [None]:
# from bookutils import YouTubeVideo
# YouTubeVideo("w4u5gCgPlmg")

**Prerequisites**

* This chapter extends the ideas from [the chapter on Generalizing Failure Circumstances](DDSetDebugger.ipynb).

In [None]:
import bookutils.setup

In [None]:
from typing import List, Tuple, Dict, Any, Optional

## Synopsis

<!-- Automatically generated. Do not edit. -->



_For those only interested in using the code in this chapter (without wanting to know how it works), give an example.  This will be copied to the beginning of the chapter (before the first section) as text with rendered input and output._

You can use `int_fuzzer()` as:

```python
print(int_fuzzer())
```
```python
=> 76.5

```


## Alhazen in a Nutshell

when diagnosing why a program fails, the first step is to determine the circumstances under which the program failed. Kampmann et al. [[KHSZ20](https://publications.cispa.saarland/3107/7/fse2020-alhazen.pdf)] presented an approach to automatically discover the circumstances of program behavior.
Their approach associates the program’s failure with the syntactical features of the input data, allowing them to learn and extract the properties that result in the specific behavior.

Their reference implementation _Alhazen_ can generate a diagnosis and explain why, for instance, a particular bug occurs.
More formally, Alhazen forms a hypothetical model based on the observed inputs.
Additional test inputs are generated and executed to refine or refute the hypothesis, eventually obtaining a prediction model of the circumstances of why the behavior in question takes place.
Alhazen use a _Decision Tree classifier_ to learn the association between the program behavior and the input features.

![title](PICS/Alhazen.png)

The tool is named after [Ḥasan Ibn al-Haytham](https://en.wikipedia.org/wiki/Ibn_al-Haytham) (latinized name: Alhazen).
Often referred to as the "Father of modern optics", Ibn al-Haytham made significant contributions to the principles of optics and visual perception.
Most notably, he was an early proponent of the concept that a hypothesis must be supported by experiments, and thus
one of the inventors of the _scientific method_, the key process in the Alhazen tool.

## Motivation

**Info:** We use the functionality provided by [The Fuzzing Book](https://www.fuzzingbook.org).
For a more detailed description of Grammars, have a look at the chapter ["Fuzzing with Grammars"](https://www.fuzzingbook.org/html/Grammars.html)

In [None]:
 from fuzzingbook.Grammars import Grammar, EXPR_GRAMMAR, reachable_nonterminals, is_valid_grammar
 from fuzzingbook.GrammarFuzzer import GrammarFuzzer, expansion_to_children, DerivationTree, tree_to_string, display_tree, is_nonterminal
 from fuzzingbook.Parser import EarleyParser

In [None]:
 import pandas
 import numpy
 import matplotlib

In [None]:
CALC_GRAMMAR: Grammar = {
    "<start>":
        ["<function>(<term>)"],

    "<function>":
        ["sqrt", "tan", "cos", "sin"],

    "<term>": ["-<value>", "<value>"],

    "<value>":
        ["<integer>.<integer>",
         "<integer>"],

    "<integer>":
        ["<digit><integer>", "<digit>"],

    "<digit>":
        ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]
}

We see that the `CALCULATOR` Grammar consists of several production rules. The calculator subject will only accept inputs that conform to this grammar definition.

Now, lets load two initial input samples:

In [None]:
# Load initial input files
initial_sample_list = ['sqrt(-16)', 'sqrt(4)']

The two initial input samples for our calculator should be:
- _sqrt(-16)_
- _sqrt(4)_

Let's check if this is true with python's `assert` function. The condition is True, if no Assertion is thrown.

Now, let's execute our two input samples and observe the calculator's behavior. To do this, we load the function `execute_samples` from the notebook ExecuteSamples.ipynb. We can call the function with a list of input samples, and it returns the corresponding execution outcome (label/oracle). The output is a [pandas dataframe](https://pandas.pydata.org/docs/reference/frame.html), and the labels are from the class `OracleResults`.

Next we implement the function `sample_runner(sample)` that lets us execute the calculator for a single sample. `sample_runner(sample)` returns the, in the pervious step imported, `OracleResult` for the sample.

In [None]:
from enum import Enum

In [None]:
class OracleResult(Enum):
    BUG = "BUG"
    NO_BUG = "NO_BUG"
    UNDEF = "UNDEF"

    def __str__(self):
        return self.value

In [None]:
"""
This file contains the code under test for the example bug.
The sqrt() method fails on x <= 0.
"""
from math import tan as rtan
from math import cos as rcos
from math import sin as rsin


def task_sqrt(x):
    """Computes the square root of x, using the Newton-Raphson method"""
    if x <= -12 and x >= -42:
    	x = 0
    else:
    	x = 1
    x = max(x, 0)
    approx = None
    guess = x / 2
    while approx != guess:
        approx = guess
        guess = (approx + x / approx) / 2
    return approx


def task_tan(x):
    return rtan(x)


def task_cos(x):
    return rcos(x)


def task_sin(x):
    return rsin(x)

In [None]:
import sys

SUBJECT = "calculator"

def sample_runner(sample):
    testcode = sample

    try:
        exec(testcode, {"sqrt": task_sqrt, "tan": task_tan, "sin": task_sin, "cos": task_cos}, {})
        return OracleResult.NO_BUG
    except ZeroDivisionError:
        return OracleResult.BUG
    except Exception as e:
        print(e, file=sys.stderr)
        return OracleResult.UNDEF

Let's test the function:

In [None]:
sample = "sqrt(-16)"
sample_runner(sample)

As expected, the sample `sqrt(-16)` triggers the calculator bug. Let's try some more samples:

In [None]:
assert sample_runner("sqrt(-23)") == OracleResult.BUG
assert sample_runner("sqrt(44)") == OracleResult.NO_BUG
assert sample_runner("cos(-9)") == OracleResult.NO_BUG

What happens if we parse inputs to calculator, that do not conform to its input format?

In [None]:
sample_runner("undef_function(QUERY)")

The function `sample_runner(sample)` returns an `OracleResult.UNDEF` whenever the runner is not able to execute the sample.

The finally we provide the function `execute_samples(sample_list)` that obtains the oracle/label for a list of samples.

In [None]:
# Executes a list of samples and return the execution outcome (label)
# The function returns a pandas dataframe
def execute_samples(sample_list):
    data = []
    for sample in sample_list:
        result = sample_runner(sample)
        data.append({"oracle": result })

    return pandas.DataFrame.from_records(data)

In [None]:
# let us define a list of samples to execute
sample_list = ["sqrt(-20)", "cos(2)", "sqrt(-100)", "undef_function(foo)"]

In [None]:
# we obtain the execution outcome
labels = execute_samples(sample_list)
display(labels)

In [None]:
# combine with the sample_list
for i, row in enumerate(labels['oracle']): print(sample_list[i].ljust(30) + str(row))

To remove the undefined input samples, you could invoke something similar to this:

In [None]:
# clean up data
clean_data = labels.drop(labels[labels.oracle.astype(str) == "UNDEF"].index)
display(clean_data)

In [None]:
# Load function execute samples
# execute_samples(List[str])
oracle = execute_samples(sample_list)
oracle

In [None]:
# Combined sample and labels by iterating over the obtained oracle
for i, row in enumerate(oracle['oracle']):
    print(sample_list[i].ljust(30) + str(row))

We observe that the sample `sqrt(-16)` triggers a bug in the calculator, whereas the sample `sqrt(4)` does not show unusual behavior. Of course, we want to know why the sample fails the program. In a typical use case, the developers of the calculator program would now try other input samples and evaluate if similar inputs also trigger the program's failure. Let's try some more input samples; maybe we can refine our understanding of why the calculator crashes:

In [None]:
# Our guesses (maybe the failure is also in the cos or tan function?)
guess_samples = ['cos(-16)', 'tan(-16)', 'sqrt(-100)', 'sqrt(-20.23412431234123)']

# lets obtain the execution outcome for each of our guess
guess_oracle = execute_samples(guess_samples)

# lets show the results
for i, row in enumerate(guess_oracle['oracle']):
    print(guess_samples[i].ljust(30) + str(row))

It looks like the failure only occurs in the `sqrt` function, however, only for specific `x` values. We could now try other values for `x` and repeat the process. However, this would be highly time-consuming and not an efficient debugging technique for a larger and more complex test subject.

Wouldn't it be great if there was a tool that automatically does this for us? And this is exactly what _Alhazen_ is used for. It helps us explain why specific input files fail a program. 

## The Alhazen Algorithm

_Alhazen_ is a tool that automatically learns the circumstances of program failure by associating syntactical features of sample inputs with the execution outcome. The produced explanations (in the form of a decision tree) help developers focus on the input space's relevant aspects.

## Step 1: Extracting Features

In this section, we are concerned with the problem of extracting semantic features from inputs. In particular, Alhazen defines various features based on the input grammar, such as *existance* and *numeric interpretation*. These features are then extracted from the parse trees of the inputs (see [Section 3 of the paper](https://publications.cispa.saarland/3107/7/fse2020-alhazen.pdf) for more details).

The implementation of the feature extraction module consists of the following three tasks:
1. Implementation of individual feature classes, whose instances allow to derive specific feature values from inputs
2. Extraction of features from the grammar through instantiation of the aforementioned feature classes
3. Computation of feature vectors from a set of inputs, which will then be used as input for the decision tree

### Feature Classes

In [None]:
from abc import ABC, abstractmethod

class Feature(ABC):
    '''
    The abstract base class for grammar features.

    Args:
        name : A unique identifier name for this feature. Should not contain Whitespaces.
               e.g., 'type(<feature>@1)'
        rule : The production rule (e.g., '<function>' or '<value>').
        key  : The feature key (e.g., the chosen alternative or rule itself).
    '''

    def __init__(self, name: str, rule: str, key: str, /,
                 friendly_name: str = None) -> None:
        self.name = name
        self.rule = rule
        self.key = key
        self._friendly_name = friendly_name or name
        super().__init__()

    def __repr__(self) -> str:
        '''Returns a printable string representation of the feature.'''
        return self.name_rep()

    @abstractmethod
    def name_rep(self) -> str:
        pass

    def friendly_name(self) -> str:
        return self._friendly_name

    @abstractmethod
    def get_feature_value(self, derivation_tree) -> float:
        '''Returns the feature value for a given derivation tree of an input.'''
        pass

    def replace(self, new_key: str) -> 'Feature':
        '''Returns a new feature with the same name but a different key.'''
        return self.__class__(self.name, self.rule, new_key)

In [None]:
class ExistenceFeature(Feature):
    '''
    This class represents existence features of a grammar. Existence features indicate
    whether a particular production rule was used in the derivation sequence of an input.
    For a given production rule P -> A | B, a production existence feature for P and
    alternative existence features for each alternative (i.e., A and B) are defined.

    name : A unique identifier name for this feature. Should not contain Whitespaces.
           e.g., 'exist(<digit>@1)'
    rule : The production rule.
    key  : The feature key, equal to the rule attribute for production features,
           or equal to the corresponding alternative for alternative features.
    '''
    def __init__(self, name: str, rule: str, key: str,
                 friendly_name: str = None) -> None:
        super().__init__(name, rule, key, friendly_name=friendly_name)

    def name_rep(self) -> str:
        if self.rule == self.key:
            return f"exists({self.rule})"
        else:
            return f"exists({self.rule} == {self.key})"

    def get_feature_value(self, derivation_tree) -> float:
        '''Returns the feature value for a given derivation tree of an input.'''
        raise NotImplementedError

    def get_feature_value(self, derivation_tree: DerivationTree) -> float:
        '''Counts the number of times this feature was matched in the derivation tree.'''
        (node, children) = derivation_tree

        # The local match count (1 if the feature is matched for the current node, 0 if not)
        count = 0

        # First check if the current node can be matched with the rule
        if node == self.rule:

            # Production existance feature
            if self.rule == self.key:
                count = 1

            # Production alternative existance feature
            # We compare the children of the expansion with the actual children
            else:
                expansion_children = list(map(lambda x: x[0], expansion_to_children(self.key)))
                node_children = list(map(lambda x: x[0], children))
                if expansion_children == node_children:
                    count= 1

        # Recursively compute the counts for all children and return the sum for the whole tree
        for child in children:
            count = max(count, self.get_feature_value(child)) 

        return count

In [None]:
from numpy import nanmax, isnan

class NumericInterpretation(Feature):
    '''
    This class represents numeric interpretation features of a grammar. These features
    are defined for productions that only derive words composed of the characters
    [0-9], '.', and '-'. The returned feature value corresponds to the maximum
    floating-point number interpretation of the derived words of a production.

    name : A unique identifier name for this feature. Should not contain Whitespaces.
           e.g., 'num(<integer>)'
    rule : The production rule.
    '''
    def __init__(self, name: str, rule: str, /, 
                 friendly_name: str = None) -> None:
        super().__init__(name, rule, rule, friendly_name=friendly_name)

    def name_rep(self) -> str:
        return f"num({self.key})"

    def get_feature_value(self, derivation_tree) -> float:
        '''Returns the feature value for a given derivation tree of an input.'''
        raise NotImplementedError

    def get_feature_value(self, derivation_tree: DerivationTree) -> float:
        '''Determines the maximum float of this feature in the derivation tree.'''
        (node, children) = derivation_tree

        value = float('nan')
        if node == self.rule:
            try:
                #print(self.name, float(tree_to_string(derivation_tree)))
                value = float(tree_to_string(derivation_tree))
            except ValueError:
                #print(self.name, float(tree_to_string(derivation_tree)), "err")
                pass

        # Return maximum value encountered in tree, ignoring all NaNs
        tree_values = [value] + [self.get_feature_value(c) for c in children]
        if all(isnan(tree_values)):
            return value
        else:
            return nanmax(tree_values)

### Extracting Feature Sets from Grammars

In [None]:
def extract_existence_features(grammar: Grammar) -> List[ExistenceFeature]:
    '''
        Extracts all existence features from the grammar and returns them as a list.
        grammar : The input grammar.
    '''

    features = []

    for rule in grammar:
        # add the rule
        features.append(ExistenceFeature(f"exists({rule})", rule, rule))
        # add all alternatives
        for count, expansion in enumerate(grammar[rule]):
            name = f"exists({rule}@{count})"
            friendly_name = f"{rule} == {repr(expansion)}"
            feature = ExistenceFeature(name, rule, expansion,
                                       friendly_name=friendly_name)
            features.append(feature)

    return features

In [None]:
from collections import defaultdict
import re

# Regex for non-terminal symbols in expansions
RE_NONTERMINAL = re.compile(r'(<[^<> ]*>)')

def extract_numeric_features(grammar: Grammar) -> List[NumericInterpretation]:
    '''
        Extracts all numeric interpretation features from the grammar and returns them as a list.

        grammar : The input grammar.
    '''

    features = []

    # Mapping from non-terminals to derivable terminal chars
    derivable_chars = defaultdict(set)

    for rule in grammar:
        for expansion in grammar[rule]:
            # Remove non-terminal symbols and whitespace from expansion
            terminals = re.sub(RE_NONTERMINAL, '', expansion).replace(' ', '')

            # Add each terminal char to the set of derivable chars
            for c in terminals:
                derivable_chars[rule].add(c)

    # Repeatedly update the mapping until convergence
    while True:
        updated = False
        for rule in grammar:
            for r in reachable_nonterminals(grammar, rule):
                before = len(derivable_chars[rule])
                derivable_chars[rule].update(derivable_chars[r])
                after = len(derivable_chars[rule])

                # Set of derivable chars was updated
                if after > before:
                    updated = True

        if not updated:
            break

    numeric_chars = set(['0','1','2','3','4','5','6','7','8','9','.','-'])

    for key in derivable_chars:
        # Check if derivable chars contain only numeric chars
        if len(derivable_chars[key] - numeric_chars) == 0:
            name = f"num({key})"
            friendly_name = f"{key}"

            features.append(NumericInterpretation(f"num({key})", key,
                                                  friendly_name=friendly_name))

    return features

In [None]:
def extract_all_features(grammar: Grammar) -> List[Feature]:
    return (extract_existence_features(grammar)
            + extract_numeric_features(grammar))

In [None]:
extract_all_features(CALC_GRAMMAR)

In [None]:
[f.friendly_name() for f in extract_all_features(CALC_GRAMMAR)]

### Extracting Feature Values from Inputs

**Note**: This is a rather slow implementation, for many grammars with many syntactically features, the feature collection can be optimized

In [None]:
def collect_features(sample_list: List[str],
                     grammar: Grammar) -> pandas.DataFrame:

    data = []

    # parse grammar and extract features
    all_features = extract_all_features(grammar)

    # iterate over all samples
    for sample in sample_list:
        parsed_features = {}
        parsed_features["sample"] = sample
        # initate dictionary
        for feature in all_features:
            parsed_features[feature.name] = 0

        # Obtain the parse tree for each input file
        earley = EarleyParser(grammar)
        for tree in earley.parse(sample):

            for feature in all_features:
                parsed_features[feature.name] = feature.get_feature_value(tree)

        data.append(parsed_features)

    return pandas.DataFrame.from_records(data)

In [None]:
sample_list = ["sqrt(-900)", "sin(24)", "cos(-3.14)"]
collect_features(sample_list, CALC_GRAMMAR)

In [None]:
# TODO: handle multiple trees
def compute_feature_values(sample: str, grammar: Grammar, features: List[Feature]) -> Dict[str, float]:
    '''
        Extracts all feature values from an input.

        sample   : The input.
        grammar  : The input grammar.
        features : The list of input features extracted from the grammar.

    '''
    earley = EarleyParser(CALC_GRAMMAR)

    features = {}
    for tree in earley.parse(sample):
        for feature in extract_all_features(CALC_GRAMMAR):
            features[feature.name_rep()] = feature.get_feature_value(tree)
    return features

In [None]:
all_features = extract_all_features(CALC_GRAMMAR)
for sample in sample_list:
    print(f"Features of {sample}:")
    features = compute_feature_values(sample, CALC_GRAMMAR, all_features)
    for feature, value in features.items():
        print(f"    {feature}: {value}")

### Excursion: Transforming Grammars

In [None]:
import random

# For this example, fix the random seed so that the produced output is deterministic
random.seed(24)
f = GrammarFuzzer(EXPR_GRAMMAR, max_nonterminals=3)
test_input = f.fuzz()
assert(test_input == tree_to_string(f.derivation_tree))

display(display_tree(f.derivation_tree))

For the grammar transformation, we perform a *rewrite step* that for each non-terminal symbol in the grammar, determines the word derived by this symbol in the input and adds it as an alternative to the symbol (as written in the Alhazen-paper). Here, we iterate through the derivation tree of the input and add the derived word of each non-terminal as alternatives to the grammar.

Let us write a function `transform_grammar` that given a sample input and a grammar, transforms it according to Kampmann et al.

```python

def transform_grammar(sample: str,
                     grammar: Grammar) -> Grammar

```

**INPUT**:
the function requires the following input parameter:
- sample: a input sample 
- grammar: the grammar that should be transformed/extended

**OUTPUT**: the function should return the transformed and extended grammar.

In [None]:
# Then, recursively iterate through the derivation tree and for each non-terminal,
# add the derived word to the grammar

def extend_grammar(derivation_tree, grammar):
    (node, children) = derivation_tree

    if is_nonterminal(node):
        assert(node in grammar)
        word = tree_to_string(derivation_tree)

        # Only add to grammar if not already existent
        if word not in grammar[node]:
            grammar[node].append(word)

    for child in children:
        extend_grammar(child, grammar)

In [None]:
import copy

def transform_grammar(sample: str,
                      grammar: Grammar) -> Grammar:
    # copy of the grammar
    transformed_grammar = copy.deepcopy(grammar)

    # parse sample
    earley = EarleyParser(grammar)
    for derivation_tree in earley.parse(sample):
        extend_grammar(derivation_tree, transformed_grammar)

    return transformed_grammar

In [None]:
# TODO Add better test case for correct validation

transformed_grammar = transform_grammar("1 + 2", EXPR_GRAMMAR)
for rule in transformed_grammar:
    print(rule.ljust(10), transformed_grammar[rule])

### End of Excursion

## Step 2: Train Classification Model

### Decision Trees

We will also use `scikit-learn` as the machine learning library. We will use the `DecisionTreeClassifier` to learn the syntactical input features that are responsible for the bug-triggering behavior of our Calculator (SUT - SubjectUnderTest). We also use the `tree` module and `graphviz` to visualize the learned decision tree.

In [None]:
import sklearn
from sklearn.tree import DecisionTreeClassifier
from sklearn.feature_extraction import DictVectorizer

First, we transform the individual input features (represented as dicts) into a NumPy array. For this example, we use the following four features (`function-sqrt`, `function-cos`, `function-sin`, `number`) to describe an input feature. (Please note that this is an extremely reduced example; this is not the complete list of features that should be extracted from the `CALCULATOR` Grammar.)

The features `function-sqrt`, `function-cos`, `function-sin`state wheater the function _sqrt_, _cos_, or _sin_ was used. A `1`is given, if the sample contains the respective function, otherwise the feature contains a `0`.

For the each <function>(x), the `number` feature describes which value was used for `x`. For instance, the first input `sqrt(-900)` corresponds to 'function-sqrt': 1 and 'number': -900.

In [None]:
# Features for each input, one dict per input
features = [
    {'function-sqrt': 1, 'function-cos': 0, 'function-sin': 0, 'number': -900}, # sqrt(-900)
    {'function-sqrt': 0, 'function-cos': 1, 'function-sin': 0, 'number': 300}, # cos(300)
    {'function-sqrt': 1, 'function-cos': 0, 'function-sin': 0, 'number': -1}, # sqrt(-1)
    {'function-sqrt': 0, 'function-cos': 1, 'function-sin': 0, 'number': -10}, # cos(-10)
    {'function-sqrt': 0, 'function-cos': 0, 'function-sin': 1, 'number': 36}, # sin(36)
    {'function-sqrt': 0, 'function-cos': 0, 'function-sin': 1, 'number': -58}, # sin(-58)
    {'function-sqrt': 1, 'function-cos': 0, 'function-sin': 0, 'number': 27}, # sqrt(27)
]

We define a list of lables (or oracles) that state wheather the specific input file resulted in a bug or not. We use the `OracleResult`-Class to keep everything tidy and clean.

In [None]:
# Labels for each input
oracle = [
    OracleResult.BUG,
    OracleResult.NO_BUG,
    OracleResult.BUG,
    OracleResult.NO_BUG,
    OracleResult.NO_BUG,
    OracleResult.NO_BUG,
    OracleResult.NO_BUG
]

# Transform to numpy array
vec = DictVectorizer()
X = vec.fit_transform(features).toarray()

Using the feature array and labels, we can now train a decision tree classifier as follows:

In [None]:
# Fix the random state to produce a deterministic result (for illustration purposes only)
clf = DecisionTreeClassifier(random_state=10)

# sci-kit learn requires an array of strings
oracle_clean = [str(c) for c in oracle]
clf = clf.fit(X, oracle_clean)

Let's have a look at the learned decision tree:

In [None]:
import graphviz

In [None]:
def show_decision_tree(clf, feature_names):
    dot_data = sklearn.tree.export_graphviz(clf, out_file=None, 
                                    feature_names=feature_names,
                                    class_names=["BUG", "NO_BUG"],  
                                    filled=True, rounded=True)  
    return graphviz.Source(dot_data)

In [None]:
show_decision_tree(clf, vec.get_feature_names_out())

In [None]:
import math

def friendly_decision_tree(clf, feature_names, class_names = ['NO_BUG', 'BUG']):
    def _tree(index, indent=0):
        s = ""
        feature = clf.tree_.feature[index]
        feature_name = feature_names[feature]
        threshold = clf.tree_.threshold[index]
        value = clf.tree_.value[index]
        class_ = int(value[0][0])
        class_name = class_names[class_]
        left = clf.tree_.children_left[index]
        right = clf.tree_.children_right[index]
        if left == right:
            # Leaf node
            s += " " * indent + class_name + "\n"
        else:
            if math.isclose(threshold, 0.5):
                s += " " * indent + f"if {feature_name}:\n"
                s += _tree(right, indent + 2)
                s += " " * indent + f"else:\n"
                s += _tree(left, indent + 2)
            else:
                s += " " * indent + f"if {feature_name} <= {threshold:.4f}:\n"
                s += _tree(left, indent + 2)
                s += " " * indent + f"else:\n"
                s += _tree(right, indent + 2)
        return s

    return _tree(0)

In [None]:
print(friendly_decision_tree(clf, vec.get_feature_names_out()))

We can see that our initial hypothesis is that the feature `function-sqrt` must be greater than 0.5 (i.e., present) and the feature `number` must be less or equal than 13 in order to produce a bug. The decision rule is not yet perfect, thus we need to refine our decision tree!

### Learning a Decision Tree

For _Alhazen's_ second activity (Train Classification Model), your are required to write a function `train_tree(data)` that trains a decision tree on a given data frame. `train_tree(data)` should return the learned decision tree.

```python

def train_tree(data: pandas.core.frame.DataFrame) -> sklearn.tree._classes.DecisionTreeClassifier

```

**INPUT**:
the function requires the following parameter:
- data: a pandas dataframe containing the parsed and extracted features and the outcome of the executed input sample (oracle). For instance, the data frame may look similar to this:

| feature_1     | feature_2     | ...    |oracle|
| ------------- |-------------|-------------|-----|
| 1     | 0 | ...| 'BUG' |
| 0     | 1 | ...| 'NO_BUG' |

**Note:** Each row of data['oracle'] is of type OracleResult. However, sci-kit learn requires an array of strings. Convert them to learn the decision tree.

**OUTPUT**: the function should return a learned decision tree of type _sklearn.tree._classes.DecisionTreeClassifier_

In [None]:
def train_tree(data):
    sample_bug_count = len(data[(data["oracle"].astype(str) == "BUG")])
    assert sample_bug_count > 0, "No bug samples found"
    sample_count = len(data)

    clf = DecisionTreeClassifier(min_samples_leaf=1,
                                     min_samples_split=2,  # minimal value
                                     max_features=None,
                                     max_depth=5, # max depth of the decision tree
                                     class_weight={str("BUG"): (1.0/sample_bug_count),
                                                   str("NO_BUG"):
                                                       (1.0/(sample_count - sample_bug_count))})
    clf = clf.fit(data.drop('oracle', axis=1), data['oracle'].astype(str))
    # self.__tree = treetools.remove_infeasible(clf, features) # MARTIN: This is optional, but is a nice extesion that results in nicer decision trees
    return clf

## Step 3: Extract Feature Requirements

In this section, we will extract the learned features from the decision tree

In [None]:
# Features for each input, one dict per input
features = [
    {'function-sqrt': 1, 'function-cos': 0, 'function-sin': 0, 'number': -900},
    {'function-sqrt': 0, 'function-cos': 1, 'function-sin': 0, 'number': 300},
    {'function-sqrt': 1, 'function-cos': 0, 'function-sin': 0, 'number': -1},
    {'function-sqrt': 0, 'function-cos': 1, 'function-sin': 0, 'number': -10},
    {'function-sqrt': 0, 'function-cos': 0, 'function-sin': 1, 'number': 36},
    {'function-sqrt': 0, 'function-cos': 0, 'function-sin': 1, 'number': -58},
    {'function-sqrt': 1, 'function-cos': 0, 'function-sin': 0, 'number': 27},
]

# Labels for each input
oracle = [
    "BUG",
    "NO_BUG",
    "BUG",
    "NO_BUG",
    "NO_BUG",
    "NO_BUG",
    "NO_BUG"
]

# We can use the sklearn DictVectorizer to transform the features to numpy array:
# Notice: Use the correct labeling of the feature_names

# vec = DictVectorizer()
# X_vec = vec.fit_transform(features).toarray()
# feature_names = vec.get_feature_names_out()

# We can also use a pandas DataFrame and directly parse it to the decision tree learner
feature_names = ['function-sqrt', 'function-cos', 'function-sin', 'number']
X_data = pandas.DataFrame.from_records(features)

# Fix the random state to produce a deterministic result (for illustration purposes only)
clf = DecisionTreeClassifier(random_state=10)

# Train with DictVectorizer
# clf = clf.fit(X_vec, oracle)

# Train with Pandas Dataframe
clf = clf.fit(X_data, oracle)

dot_data = sklearn.tree.export_graphviz(clf, out_file=None, 
                                feature_names=feature_names,
                                class_names=["BUG", "NO BUG"],  
                                filled=True, rounded=True)  
graph = graphviz.Source(dot_data)  

**Note:** The sklearn DictVectorizer uses an internal sort function as default. This will result in different feature_name indices. If you want to use the Dictvectorizer please ensure that you only access the feature_names with the function <i>vec.get_feature_names_out()</i>. We recommend that you use the pandas Dataframe, since this is also the format used in the feedback loop.

In [None]:
graph

In [None]:
print(friendly_decision_tree(clf, feature_names, class_names = ['NO_BUG', 'BUG']))

### Excursion: Tree helper functions

In [None]:
"""This file bundles several functions which are helpful when working with decision trees."""

def all_path(clf, node=0):
    """Iterate over all path in a decision tree. Path will be represented as
    a list of integers, each integer is the index of a node in the clf.tree_ structure."""
    left = clf.tree_.children_left[node]
    right = clf.tree_.children_right[node]

    if left == right:
        yield [node]
    else:
        for path in all_path(clf, left):
            yield [node] + path
        for path in all_path(clf, right):
            yield [node] + path


def path_samples(clf, path):
    """Returns the number of samples for this path. """
    return clf.tree_.n_node_samples[path[-1]]


def generic_feature_names(clf):
    """Gives a list of feature names of the form f1, f2, ..."""
    return ["f{}".format(f) for f in range(0, clf.tree_.n_features)]


def box(clf, path, data=None, feature_names=None):
    """For a decision tree classifier clf and a path path (as returned, e.g. by all_path),
    this method gives a pandas DataFrame with the min and max of each feature value on the given path."""

    if feature_names is None:
        feature_names = generic_feature_names(clf)
    check_for_duplicates(feature_names)
    if data is None:
        bounds = pandas.DataFrame([{'feature': c, 'min': -numpy.inf, 'max': numpy.inf} for c in feature_names],
                                  columns=['feature', 'min', 'max']).set_index(['feature']).transpose()
    else:
        bounds = pandas.DataFrame([{'feature': c, 'min': data[c].min(), 'max': data[c].max()} for c in feature_names],
                                  columns=['feature', 'min', 'max']).set_index(['feature']).transpose()

    for pos in range(0, len(path) - 1):
        node = path[pos]
        child = path[pos + 1]
        feature = feature_names[clf.tree_.feature[node]]
        threshold = clf.tree_.threshold[node]

        if child == clf.tree_.children_left[node]:
            bounds.at['max', feature] = threshold
        else:
            bounds.at['min', feature] = threshold
    return bounds


def rectangles(clf, colormap, data, feature_names=None):
    """yields matplotlib.patches rectangle objects. Each object represents a leaf of the tree."""
    if feature_names is None:
        feature_names = ['in_x', 'in_y']
    if 2 != len(feature_names):
        raise AssertionError("Rectangles can only be generated if there are at most 2 features.")

    x_feature = feature_names[0]
    y_feature = feature_names[1]

    for path in all_path(clf):
        b = box(clf, path, data=data, feature_names=feature_names)
        p = prediction_for_path(clf, path)
        c = colormap[p]
        rect = matplotlib.patches.Rectangle((b[x_feature]['min'], 
                                             b[y_feature]['min']),
                             # coordinates
                             b[x_feature]['max'] - b[x_feature]['min'],  # width
                             b[y_feature]['max'] - b[y_feature]['min'],  # height
                             alpha=.2, facecolor=c, edgecolor='k')
        yield rect


def prediction_for_path(clf, path) -> OracleResult:
    last_value = clf.tree_.value[path[-1]][0]
    p_class = numpy.argmax(last_value)
    return OracleResult(clf.classes_[p_class])


def rule(clf, path, feature_names, class_names=None):
    """Creates a rule from one path in the decision tree."""
    bounds = box(clf, path, feature_names=feature_names)
    prediction = prediction_for_path(clf, path)
    if class_names is not None:
        prediction = class_names[prediction]

    feature_rules = []
    for fname in feature_names:
        min_ = bounds[fname]['min']
        max_ = bounds[fname]['max']

        if numpy.isinf(min_) and numpy.isinf(max_):
            pass  # no rule if both are unbound
        elif numpy.isinf(min_):
            feature_rules.append("{} <= {:.4f}".format(fname, max_))
        elif numpy.isinf(max_):
            feature_rules.append("{} > {:.4f}".format(fname, min_))
        else:
            feature_rules.append("{} in {:.4f} to {:.4f}".format(fname, min_, max_))

    return " AND ".join(feature_rules), prediction, clf.tree_.impurity[path[-1]], clf.tree_.n_node_samples[path[-1]]


def rules(clf, class_names=None, feature_names=None):
    """Formats Decision trees in a rule-like representation."""

    if feature_names is None:
        feature_names = generic_feature_names(clf)

    samples = clf.tree_.n_node_samples[0]
    return "\n".join(["IF {2} THEN PREDICT '{3}' ({0}: {4:.4f}, support: {5} / {1})"
                     .format(clf.criterion, samples,
                             *rule(clf, path, feature_names, class_names=class_names)) for path in all_path(clf)])


def grouped_rules(clf, class_names=None, feature_names=None):
    """Formats decision trees in a rule-like representation, grouped by class."""

    if feature_names is None:
        feature_names = generic_feature_names(clf)

    rules = {}
    for path in all_path(clf):
        rulestr, clz, impurity, support = rule(clf, path, class_names=class_names, feature_names=feature_names)
        if clz not in rules:
            rules[clz] = []
        rules[clz].append((rulestr, impurity, support))

    res = ""
    samples = clf.tree_.n_node_samples[0]
    for clz in rules:
        rulelist = rules[clz]
        res = res + "\n{}:\n\t".format(clz)
        rl = ["{} ({}: {:.4f}, support: {}/{})".format(r, clf.criterion, impurity, support, samples) for r, impurity, support in rulelist]
        res = res + "\n\tor ".join(rl)
    return res.lstrip()


def check_for_duplicates(names):
    seen = set()
    for name in names:
        if name in seen:
            raise AssertionError("Duplicate name: {}".format(name))
        seen.add(name)


def is_leaf(clf, node: int) -> bool:
    """returns true if the given node is a leaf."""
    return clf.tree_.children_left[node] == clf.tree_.children_right[node]


def leaf_label(clf, node: int) -> int:
    """returns the index of the class at this node. The node must be a leaf."""
    assert(is_leaf(clf, node))
    occs = clf.tree_.value[node][0]
    idx = 0
    maxi = occs[idx]
    for i, o in zip(range(0, len(occs)), occs):
        if maxi < o:
            maxi = o
            idx = i
    return idx


def remove_infeasible(clf, features: List[Feature]):
    for node in range(0, clf.tree_.node_count):
        if not is_leaf(clf, node):
            feature = features[clf.tree_.feature[node]]
            threshold = clf.tree_.threshold[node]
            if not feature.is_feasible(threshold):
                clf.tree_.feature[node] = find_existence_index(features, feature)
                clf.tree_.threshold[node] = 0.5
    return clf


def iterate_nodes(clf):
    stack = [0]
    while 0 != len(stack):
        node = stack.pop()
        yield node
        if not is_leaf(clf, node):
            stack.append(clf.tree_.children_left[node])
            stack.append(clf.tree_.children_right[node])


def count_nodes(clf):
    return len(list(iterate_nodes(clf)))


def count_leaves(clf):
    return len([n for n in iterate_nodes(clf) if is_leaf(clf, n)])


def list_features(clf):
    return [clf.tree_.feature[node] for node in iterate_nodes(clf)]


def remove_unequal_decisions(clf):
    """
    This method rewrites a decision tree classifier to remove nodes where the same
    decision is taken on both sides.

    :param clf: a decision tree classifier
    :return: the same classifier, rewritten
    """
    changed = True
    while changed:
        changed = False
        for node in range(0, clf.tree_.node_count):
            if not is_leaf(clf, node) and (is_leaf(clf, clf.tree_.children_left[node]) and is_leaf(clf, clf.tree_.children_right[node])):
                # both children of this node are leaves
                left_label = leaf_label(clf, clf.tree_.children_left[node])
                right_label = leaf_label(clf, clf.tree_.children_right[node])
                if left_label == right_label:
                    clf.tree_.children_left[node] = -1
                    clf.tree_.children_right[node] = -1
                    clf.tree_.feature[node] = -2
                    changed = True
                    assert(left_label == leaf_label(clf, node))
    return clf


### End of Excursion

### Excursion: Converting Trees to Paths

In [None]:
import logging

def tree_to_paths(tree, features: List[Feature]):
    logging.info("Extracting requirements from tree ...")
    paths = []
    # go through tree leaf by leaf
    for path in all_path(tree):
        requirements = []
        is_bug = OracleResult.BUG == prediction_for_path(tree, path)
        # find the requirements
        box_ = box(tree, path, feature_names=features).transpose()
        for feature, row in box_.iterrows():
            mini = row['min']
            maxi = row['max']
            if (not numpy.isinf(mini)) or (not numpy.isinf(maxi)):
                requirements.append(TreeRequirement(feature, mini, maxi))
        paths.append(TreePath(None, is_bug, requirements))

    return paths


In [None]:
class TreeRequirement:
    def __init__(self, feature: Feature, mini, maxi):
        self.__feature: Feature = feature
        self.__mini = mini
        self.__maxi = maxi

    def feature(self) -> Feature:
        return self.__feature

    def select(self, data):
        """Returns a vector of booleans, suitable for selecting in a pandas data frame."""
        if self.__mini is None:
            return data[self.__feature.name()] <= self.__maxi
        if self.__maxi is None:
            return self.__mini <= data[self.__feature.name()]
        return (self.__mini <= data[self.__feature.name()]) & (data[self.__feature.name()] <= self.__maxi)

    def mini(self):
        return self.__mini

    def maxi(self):
        return self.__maxi

    def get_key(self) -> str:
        return self.__feature.key()

    def is_binary(self) -> bool:
        return self.__feature.is_binary()

    def get_str(self, bounds) -> str:
        if self.is_binary():
            if self.__mini < 0 <= self.__maxi:
                # feature is NOT included
                return f"!{self.__feature.name()}"
            if self.__mini < 1 <= self.__maxi:
                # feature is included
                return self.__feature.name()
            raise AssertionError("How is this possible?")
        else:
            if (not numpy.isinf(self.__mini)) and (not numpy.isinf(self.__maxi)):
                return f"{self.__feature.name()} in [{self.__mini}, {self.__maxi}]"
            elif not numpy.isinf(self.__maxi):
                return f"{self.__feature.name()} <= {self.__maxi}"
            else:
                return f"{self.__feature.name()} > {self.__mini}"

    def get_str_ext(self) -> str:
        if (not numpy.isinf(self.__mini)) and (not numpy.isinf(self.__maxi)):
            return f"{self.__feature} in [{self.__mini}, {self.__maxi}]"
        elif not numpy.isinf(self.__maxi):
            return f"{self.__feature} <= {self.__maxi}"
        else:
            return f"{self.__feature} > {self.__mini}"

    def get_neg(self, bounds) -> List[str]:
        if self.is_binary():
            if self.__mini < 0 <= self.__maxi:
                # feature is NOT included, so, the negated condition is to include it
                return [self.__feature.name()]
            if self.__mini < 1 <= self.__maxi:
                # feature is included, so exclude it
                return [f"!{self.__feature.name()}"]
            raise AssertionError("How is this possible?")
        else:
            if (not numpy.isinf(self.__mini)) and (not numpy.isinf(self.__maxi)):
                return [f"{self.__feature.name()} in [{bounds.at['min', self.__feature.name()]},{self.__mini}]",
                        f"{self.__feature.name()} in [{self.__maxi}, {bounds.at['max', self.__feature.name()]}]"]
            elif not numpy.isinf(self.__maxi):
                return [f"{self.__feature.name()} <= {self.__maxi}"]
            else:
                return [f"{self.__feature.name()} > {self.__mini}"]

    def get_neg_ext(self, bounds) -> List[str]:
        if (not numpy.isinf(self.__mini)) and (not numpy.isinf(self.__maxi)):
            return [f"{self.__feature} in [{bounds.at['min', self.__feature]},{self.__mini}]",
                    f"{self.__feature} in [{self.__maxi}, {bounds.at['max', self.__feature]}]"]
        elif not numpy.isinf(self.__maxi):
            return [f"{self.__feature} > {self.__maxi}"]
        else:
            return [f"{self.__feature} <= {self.__mini}"]

In [None]:
from pathlib import Path

class TreePath:
    def __init__(self, samplefile: Optional[Path], is_bug: bool, requirements: List[TreeRequirement]):
        self.__sample = samplefile
        self.__is_bug = is_bug
        self.__requirements: List[TreeRequirement] = requirements

    def is_bug(self) -> bool:
        return self.__is_bug

    def get(self, idx):
        return self.__requirements[idx]

    def find_sample(self, data):
        for req in self.__requirements:
            data = data[req.select(data)]
        if 0 != len(data):
            return data["abs_file"][0]
        return None

    def __len__(self) -> int:
        return len(self.__requirements)


def lower_middle(start, end):
    if start == end:
        return start - abs(start)
    return start + ((end - start)/2)


def upper_middle(start, end):
    if start == end:
        return end + abs(end)
    return start + ((end - start)/2)


def min_digits(mini):
    return int("1" + "".join([0] * int(mini-1)))


def max_digits(maxi):
    return int("".join([9] * int(maxi)))

### End of Excursion

In [None]:
# We provide a functionallity to extract the paths from a decision tree.
all_paths = tree_to_paths(clf, feature_names)

In [None]:
for count, path in enumerate(all_paths):
    string_path = path.get(0).get_str_ext()
    for box_ in range(1, len(path)):
        string_path += " " + path.get(box_).get_str_ext()
    print(f"Path {count}: {string_path}, is_bug: {path.is_bug()}")

## Step 4: Generating New Samples

### Negating Requirements

The next step is to negate the requirements on a path to refine and refute the decision tree

First we will determine some boundaries to obtain better path negations.

In [None]:
x = pandas.DataFrame.from_records(features)
bounds = pandas.DataFrame([{'feature': c, 'min': x[c].min(), 'max': x[c].max()}
                           for c in feature_names],
                          columns=['feature', 'min', 'max']).set_index(['feature']).transpose()

We can use the function `path.get(i).get_neg_ext(bounds)` to obtain a negation for a single requirement on a path (indexed with `i`).

Lets verify if we can negate a whole path.

In [None]:
for count, path in enumerate(all_paths):
    negated_string_path = path.get(0).get_neg_ext(bounds)[0]
    for box_ in range(1, len(path)):
        negated_string_path += " " + str(path.get(box_).get_neg_ext(bounds)[0])
    print(f"Path {count}: {negated_string_path}, is_bug: {path.is_bug()}")

Lets verify if we can negate a whole path.

### Systematically Negating Paths

We will use the Decision tree and extract new input specifications to refine or refute our hypothesis (See paper Section 4.1 - Extracting Prediction Paths). These input specifications will be parsed to the input generator that tries to generate new inputs that fullfil the defined input specifications.

In [None]:
def extracting_prediction_paths(clf, feature_names, data):
    
    # determine the bounds
    bounds = pandas.DataFrame([{'feature': c, 'min': data[c].min(), 'max': data[c].max()}
                           for c in feature_names],
                          columns=['feature', 'min', 'max']).set_index(['feature']).transpose()
    
    # go through tree leaf by leaf
    all_reqs = set()
    for path in tree_to_paths(clf, feature_names):
        # generate conditions
        for i in range(0, len(path)+1):
            reqs_list = []
            bins = format(i, "#0{}b".format(len(path)+2))[2:]
            for p, b in zip(range(0, len(bins)), bins):
                r = path.get(p)
                if '1' == b:
                    reqs_list.append(r.get_neg_ext(bounds))
                else:
                    reqs_list.append([r.get_str_ext()])
            for reqs in all_combinations(reqs_list):
                all_reqs.add(", ".join(sorted(reqs)))
    return all_reqs

def all_combinations(reqs_lists):
    result = [[]]
    for reqs in reqs_lists:
        t = []
        for r in reqs:
            for i in result:
                t.append(i+[r])
        result = t
    return result

We will use the Decision tree and extract new input specifications to refine or refute our hypothesis (See paper Section 4.1 - Extracting Prediction Paths). These input specifications will be parsed to the input generator that tries to generate new inputs that fullfil the defined input specifications.

In [None]:
new_prediction_paths = extracting_prediction_paths(clf, feature_names, data=x)

In [None]:
for path in new_prediction_paths:
    print(path)

### Input Specification Parser

In [None]:
import string

SPEC_GRAMMAR: Grammar = {
    "<start>":
        ["<req_list>"],

    "<req_list>": 
        ["<req>", "<req>"", ""<req_list>"],

    "<req>":
        ["<feature>"" ""<quant>"" ""<num>"],

    "<feature>": ["exists(<string>)",
                  "num(<string>)",
                  # currently not used
                  "char(<string>)",
                  "length(<string>)"],

    "<quant>":
        ["<", ">", "<=", ">="],

    "<num>": ["-<value>", "<value>"],

    "<value>":
        ["<integer>.<integer>",
         "<integer>"],

    "<integer>":
        ["<digit><integer>", "<digit>"],

    "<digit>":
        ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"],

    '<string>': ['<letters>'],
    '<letters>': ['<letter><letters>', '<letter>'],
    '<letter>': list(string.ascii_letters + string.digits + string.punctuation)
}

assert is_valid_grammar(SPEC_GRAMMAR)

### Excursion: Validating the Parser

Lets validate our grammar, by using the grammar to produce 100 sample requirement specifications

In [None]:
g = GrammarFuzzer(SPEC_GRAMMAR, max_nonterminals=100)
earley = EarleyParser(SPEC_GRAMMAR)
for i in range(10):
    sample = g.fuzz()
    print(sample)

In [None]:
g = GrammarFuzzer(SPEC_GRAMMAR, max_nonterminals= 100)
earley = EarleyParser(SPEC_GRAMMAR)
for i in range(100):
    sample = g.fuzz()
    for tree in earley.parse(sample):
        assert tree_to_string(tree) == sample, f"{tree_to_string(tree)} and {sample} are not equal"

Lets also try with some real requirement specifications

In [None]:
earley = EarleyParser(SPEC_GRAMMAR)
teststrings = ['exists(<function>@0) > 0.5, exists(<term>@0) <= 0.5, exists(<value>@1) <= 0.5',
               'exists(<digit>@9) <= 0.5, exists(<function>@0) > 0.5, num(<term>) > 0.05000000074505806',
               'exists(<digit>@2) <= 0.5, exists(<function>@0) < 0.5, num(<term>) <= 0.05000000074505806',
               'exists(<function>@0) > 0.5, num(<term>) > -3965678.1875']
for count, sample in enumerate(teststrings):
    for tree in earley.parse(sample):
        assert tree_to_string(tree) == teststrings[count], \
        f"{tree_to_string(tree)} and {teststrings[count]} are not equal"

### End of Excursion

### Retrieving New input Specifications

In [None]:
class SpecRequirement:
    '''
    This class represents a requirement for a new input sample that should be generated.
    This class contains the feature that should be fullfiled (Feature), a quantifier
    ("<", ">", "<=", ">=") and a value. For instance exist(feature) >= 0.5 states that
    the syntactical existence feature should be used to produce a new input.

    feature  : Is the associated feature class
    quant    : The quantifier
    value    : The value of the requirement. Note that for existence features this value
                is allways between 0 and 1.
    '''

    def __init__(self, feature: Feature, quantificator, value):
        self.feature: Feature = feature
        self.quant = quantificator
        self.value = value

    def __str__(self):
        return f"Requirement({self.feature.name} {self.quant} {self.value})"

    def __repr__(self):
        return f"Requirement({self.feature.name}, {self.quant}, {self.value})"

    def friendly(self):
        def value(x):
            try:
                return float(x)
            except Exception:
                return None

        if isinstance(self.feature, ExistenceFeature):
            if value(self.value) > 0:
                return f"{self.feature.friendly_name()}"
            elif value(self.value) < 0:
                return f"not {self.feature.friendly_name()}"

        return f"{self.feature.friendly_name()} {self.quant} {self.value}"

In [None]:
class InputSpecification:
    '''
    This class represents a complete input specification of a new input. A input specification
    consists of one or more requirements.
    requirements  : Is a list of all requirements that must be used.
    '''

    def __init__(self, requirements: List[SpecRequirement]):
        self.requirements: List[SpecRequirement] = requirements

    def __str__(self):
        s = ", ".join(str(r) for r in self.requirements)
        return f"InputSpecification({s})"

    def friendly(self):
        return " and ".join(r.friendly() for r in self.requirements)

    def __repr__(self):
        return self.__str__()

In [None]:
def get_all_subtrees(derivation_tree, non_terminal):
    '''
    Iteratively returns a list of subtrees that start with a given non_terminal.
    '''

    subtrees = []
    (node, children) = derivation_tree

    if node == non_terminal:
        subtrees.append(derivation_tree)

    for child in children:
        subtrees = subtrees + get_all_subtrees(child, non_terminal)

    return subtrees

In [None]:
def create_new_input_specification(derivation_tree, all_features) -> InputSpecification:
    '''
    This function creates a new input specification for a parsed decision tree path.
    The input derivation_tree corresponds to a already negated path in the decision tree.
    '''

    requirement_list = []

    for req in get_all_subtrees(derivation_tree, '<req>'):
        feature_name = tree_to_string(get_all_subtrees(req, '<feature>')[0])
        quant = tree_to_string(get_all_subtrees(req, '<quant>')[0])
        value = tree_to_string(get_all_subtrees(req, '<num>')[0])

        feature_class = None
        for f in all_features:
            if f.name == feature_name:
                feature_class = f

        requirement_list.append(SpecRequirement(feature_class, quant, value))

    return InputSpecification(requirement_list)

In [None]:
def get_all_input_specifications(dec_tree,
                                 all_features: List[Feature],
                                 feature_names: List[str],
                                 data) -> List[InputSpecification]:
    '''
    Returns a complete list new input specification that were extracted from a learned decision tree.

    INPUT:
        - dec_tree       : The learned decision tree.
        - all_features   : A list of all features
        - feature_names  : The list of the feature names (feature.name)
        - data.          : The data that was used to learn the decision tree

    OUTPUT:
        - Returns a list of InputSpecifications
    '''
    prediction_paths = extracting_prediction_paths(dec_tree, feature_names, data)
    input_specifications = []

    # parse all extracted paths
    for r in prediction_paths:
        earley = EarleyParser(SPEC_GRAMMAR)
        try:
            for tree in earley.parse(r):
                input_specifications.append(create_new_input_specification(tree, all_features))
        except SyntaxError:
            # Catch Parsing Syntax Errors: num(<term>) in [-900, 0] will fail; Might fix later
            # For now, inputs following that form will be ignored
            pass

    return input_specifications

### Excursion: Testing Specifications

In [None]:
sample_prediction_paths = ['exists(<function>@0) > 0.5, num(<term>) <= -38244758.0',
                        'exists(<digit>@7) <= 0.5, exists(<function>@0) > 0.5, num(<term>) <= 0.05000000074505806',
                        'exists(<digit>) > 1.5, exists(<function>@0) > 0.5, num(<term>) <= 0.21850000321865082', 
                        'exists(<function>@0) > 0.5']

expected_input_specifications = ['InputSpecification(Requirement(exists(<function>@0) > 0.5), Requirement(num(<term>) <= -38244758.0))',
                                 'InputSpecification(Requirement(exists(<digit>@7) <= 0.5), Requirement(exists(<function>@0) > 0.5), Requirement(num(<term>) <= 0.05000000074505806))',
                                 'InputSpecification(Requirement(exists(<digit>) > 1.5), Requirement(exists(<function>@0) > 0.5), Requirement(num(<term>) <= 0.21850000321865082))',
                                 'InputSpecification(Requirement(exists(<function>@0) > 0.5))']

all_features = extract_all_features(CALC_GRAMMAR)

earley = EarleyParser(SPEC_GRAMMAR)
for count, sample in enumerate(sample_prediction_paths):
    for tree in earley.parse(sample):
        input_specification = create_new_input_specification(tree, all_features)
        assert str(input_specification) == expected_input_specifications[count], \
            f"{str(input_specification)} is not equal to {expected_input_specifications[count]}"

If no assertion is triggered, then everything seems to work.

### End of Excursion

We implement a _Grammar-Based Input Generator_ that generates new input samples from a List of `Input Specifications`.
The Input Specifications are extracted from the decision tree boundaries in the previous Activity 3: _RequirementExtraction_.

An Input Specification consists of **1 to n** many predicates or requirements (e.g. feature '>=' value, or 'num(term) <= 13').
We  generate a new input for each `InputSpecification`.
The new input fulfills all the given requirements of an InputSpecification.

**Info:** For further details, please refer to <b>Section 4.4 and 4.5</b> of the <a href="https://publications.cispa.saarland/3107/7/fse2020-alhazen.pdf">paper</a> and the Chapter <b><a href="https://www.fuzzingbook.org/html/GrammarFuzzer.html">Efficient Grammar Fuzzing</a></b> in the fuzzingbook.

**INPUT**:
the function requires the following input parameter:
- grammar: the grammar this is used to produce new inputs (e.g. the CALCULATOR-Grammar)
- new_input_specification: a List of new inputs specifications (List\[InputSpecification\])
- timeout: a max time budget. Return the generated inputs when the timebudget is exeeded.

**OUTPUT**: the function should return a list of new inputs that are specified by the given input specifications.

In [None]:
import time
import copy
from copy import deepcopy
import random
from itertools import chain

In [None]:
def best_trees(forest, spec):
    samples = [tree_to_string(tree) for tree in forest]
    fulfilled_fractions= []
    for sample in samples:
        gen_features = collect_features([sample], CALC_GRAMMAR)

        # calculate percentage of fulfilled requirements (used to rank the sample)
        fulfilled_count = 0
        total_count = len(spec.requirements)
        for req in spec.requirements:
            # for now, interpret requirement(exists(<...>) <= number) as false and requirement(exists(<...>) > number) as true
            if isinstance(req.feature, ExistenceFeature):
                expected = 1.0 if req.quant == '>' or req.quant == '>=' else 0.0
                actual = gen_features[req.feature.name][0]
                if actual == expected:
                    fulfilled_count += 1
                else:
                    pass
                    # print(f'{req.feature} expected: {expected}, actual:{actual}')
            elif isinstance(req.feature, NumericInterpretation):
                expected_value = float(req.value)
                actual_value = gen_features[req.feature.name][0]
                fulfilled = False
                if req.quant == '<':
                    fulfilled = actual_value < expected_value
                elif req.quant == '<=':
                    fulfilled = actual_value <= expected_value
                elif req.quant == '>':
                    fulfilled = actual_value > expected_value
                elif req.quant == '>=':
                    fulfilled = actual_value >= expected_value

                if fulfilled:
                    fulfilled_count += 1
                else:
                    pass
                    # print(f'{req.feature} expected: {expected_value}, actual:{actual_value}')
        fulfilled_fractions.append(fulfilled_count / total_count)
        # print(f'Fraction of fulfilled requirements: {fulfilled_count / total_count}')
    max_frac = max(fulfilled_fractions)
    best_chosen = []
    if max_frac == 1.0:
        return True, forest[fulfilled_fractions.index(1.0)]

    for i, t in enumerate(forest):
        if fulfilled_fractions[i] == max_frac:
            best_chosen.append(t)
    return False, best_chosen


In [None]:
# well, not perfect and probably not very robust. but it works :)
def generate_samples_advanced(grammar: Grammar,
                     new_input_specifications: List[InputSpecification],
                     timeout: int) -> List[str]:

    # if there are no input specifications: generate some random samples
    if len(new_input_specifications) == 0:
        fuzzer = GrammarFuzzer(grammar)
        samples = [fuzzer.fuzz() for _ in range(100)]
        return samples

    final_samples = []
    each_spec_timeout = timeout / len(new_input_specifications)

    rhs_nonterminals = grammar.keys()# list(chain(*[nonterminals(expansion) for expansion in grammar[rule]]))

    fuzzer = GrammarFuzzer(grammar)

    for spec in new_input_specifications:
        done = False
        starttime = time.time()
        best_chosen = [fuzzer.fuzz_tree() for _ in range(100)]
        done, best_chosen = best_trees(best_chosen, spec)
        if done:
            final_samples.append(tree_to_string(best_chosen))

        while not done and time.time() - starttime < each_spec_timeout:
            # split in prefix, postfix and try to reach targets
            for tree in best_chosen:
                prefix_len = random.randint(1, 3)
                curr = tree
                valid = True
                for i in range(prefix_len):
                    nt, children = curr
                    poss_desc_idxs = []
                    for c_idx, c in enumerate(children):
                        s, _ = c
                        possible_descend = s in rhs_nonterminals
                        if possible_descend:
                            poss_desc_idxs.append(c_idx)
                    if len(poss_desc_idxs) < 1:
                        valid = False
                        break
                    desc = random.randint(0, len(poss_desc_idxs) - 1)
                    curr = children[poss_desc_idxs[desc]]
                if valid:
                    nt, _ = curr
                    for req in spec.requirements:
                        if isinstance(req.feature, NumericInterpretation) and nt == req.feature.key:
                            # hacky: generate a derivation tree for this numeric interpretation
                            hacky_grammar = copy.deepcopy(grammar)
                            hacky_grammar["<start>"] = [nt]
                            parser = EarleyParser(hacky_grammar)
                            try:
                                test = parser.parse(req.value)
                                x = list(test)[0]
                                _, s = x
                                # print(str(s[0]))
                                # replace curr in tree with this new tree
                                curr = s[0]
                            except SyntaxError:
                                pass
            done, best_chosen = best_trees(best_chosen, spec)
            if done:
                final_samples.append(tree_to_string(best_chosen))
        if not done:
            final_samples.extend([tree_to_string(t) for t in best_chosen])

    return final_samples

In [None]:
generate_samples = generate_samples_advanced

Here's another interesting generator function:

In [None]:
def generate_samples_random(grammar, new_input_specifications, num):
    f = GrammarFuzzer(grammar ,max_nonterminals=50, log=False)
    data = []
    for _ in range(num):
        new_input = f.fuzz()
        data.append(new_input)

    return data

In [None]:
### Excursion: Some Tests

In [None]:
exsqrt = ExistenceFeature('exists(<function>@0)', '<function>', 'sqrt')
exdigit = ExistenceFeature('exists(<digit>)', '<digit>', '<digit>')

reqDigit = SpecRequirement(exdigit, '>', '0.5')
fbdDigit = SpecRequirement(exdigit, '<=', '0.5')

req0 = SpecRequirement(exsqrt, '>', '-6.0')
testspec0 = InputSpecification([req0, reqDigit])
req1 = SpecRequirement(exsqrt, '<=', '-6.0')
testspec1 = InputSpecification([req1, fbdDigit])

numterm = NumericInterpretation('num(<term>)', '<term>')
req2 = SpecRequirement(numterm, '<', '-31.0')
testspec2 = InputSpecification([req2, req0, reqDigit])

print('--generating samples--')
# samples = generate_samples(CALC_GRAMMAR, [testspec0, testspec1], 10)
samples = generate_samples_advanced(CALC_GRAMMAR, [testspec2], 10)
samples

In [None]:
### End of Excursion

## Step 5: Executing New Input Files

### The Alhazen Class

In [None]:
class Alhazen:
    def __init__(self,
                 runner: Any,
                 grammar: Grammar,
                 initial_inputs: List[str], /,
                 max_iterations: int = 10,
                 generator_timeout: int = 10):

        self._initial_inputs = initial_inputs
        self._grammar = grammar
        self._runner = runner
        self._max_iter = max_iterations
        self._previous_samples = None
        self._data = None
        self._trees = []
        self._generator_timeout = generator_timeout
        self._setup()

In [None]:
class Alhazen(Alhazen):
    def _setup(self):
        self._previous_samples = self._initial_inputs

        self._all_features = extract_all_features(self._grammar)
        self._feature_names = [f.name for f in self._all_features]
        print("Features:", ", ".join(f.friendly_name()
                                     for f in self._all_features))

In [None]:
class Alhazen(Alhazen):
    def _add_new_data(self, exec_data, feature_data):
        joined_data = exec_data.join(feature_data.drop(['sample'], axis=1))

        # Only add valid data
        new_data = joined_data[(joined_data['oracle'] != OracleResult.UNDEF)]
        new_data = joined_data.drop(joined_data[joined_data.oracle.astype(str) == "UNDEF"].index)
        if 0 != len(new_data):
            if self._data is None:
                self._data = new_data
            else:
                self._data = pandas.concat([self._data, new_data], sort=False)

In [None]:
class Alhazen(Alhazen):
    def _finalize(self):
        return self._trees

In [None]:
class Alhazen(Alhazen):
    def execute_samples(self, sample_list = None):
        if sample_list is None:
            sample_list = self._initial_inputs

        data = []
        for sample in sample_list:
            result = self._runner(sample)
            data.append({"oracle": result })

        return pandas.DataFrame.from_records(data)

In [None]:
class Alhazen(Alhazen):
    def run(self):
        for iteration in range(self._max_iter):
            print(f"\nIteration #{iteration}")
            self._iterate(self._previous_samples)

        return self._finalize()

In [None]:
class Alhazen(Alhazen):
    def _iterate(self, sample_list):
        # Obtain labels, execute samples (Initial Step)
        exec_data = self.execute_samples(sample_list)

        # Collect features from the new samples
        feature_data = collect_features(sample_list, self._grammar)

        # Combine the new data with the already existing data
        self._add_new_data(exec_data, feature_data)
        # display(self._data)

        # Train a tree
        dec_tree = train_tree(self._data)
        self._trees.append(dec_tree)

        # Extract new requirements from the tree (Activity 3)
        new_input_specifications = get_all_input_specifications(dec_tree,
                                                self._all_features,
                                                self._feature_names,
                                                self._data.drop(['oracle'], axis=1))
        print(f"  New input specifications:")
        for spec in new_input_specifications:
            print(f"    {spec.friendly()}")

        # generate new inputs according to the new input specifications
        new_samples = generate_samples(self._grammar,
                                       new_input_specifications,
                                       self._generator_timeout)
        print(f"  New samples:")
        print(f"    {", ".join(new_samples)}")
        self._previous_samples = new_samples

### A Sample Run

We can finally run _Alhazen_!

In [None]:
# Set the number of refinement iterations and the timeout for the input generator
# The execution time of Alhazen mainly depends on the number of iterations

MAX_ITERATIONS = 20
GENERATOR_TIMEOUT = 10 # timeout in seconds

In [None]:
# We initialize Alhazen with the previously used sample_list (['sqrt(-16)', 'sqrt(4)'])
alhazen = Alhazen(sample_runner, CALC_GRAMMAR, initial_sample_list,
                  max_iterations=MAX_ITERATIONS,
                  generator_timeout=GENERATOR_TIMEOUT)

# and run it
# Alhazen returns a list of all the iteratively learned decision trees
trees = alhazen.run()

Let's display the final decision tree learned by Alhazen. You can use the function `show_tree(decision_tree, features)` to display the final tree.

In [None]:
final_tree = trees[MAX_ITERATIONS-1]
final_tree

In [None]:
all_features = extract_all_features(CALC_GRAMMAR)
all_feature_names = [f.friendly_name() for f in all_features]

In [None]:
show_decision_tree(final_tree, all_feature_names)

**Info:** The decision tree may contain unnecessary long paths, where the bug-class does not change. You can use the function `remove_unequal_decisions(decision_tree)` to remove those nodes.

In [None]:
show_decision_tree(remove_unequal_decisions(final_tree), all_feature_names)

In [None]:
print(friendly_decision_tree(final_tree, all_feature_names))

## Synopsis

_For those only interested in using the code in this chapter (without wanting to know how it works), give an example.  This will be copied to the beginning of the chapter (before the first section) as text with rendered input and output._

## Lessons Learned

* _Lesson one_
* _Lesson two_
* _Lesson three_

## Next Steps

_Link to subsequent chapters (notebooks) here, as in:_

* [use _assertions_ to check conditions at runtime](Assertions.ipynb)
* [reduce _failing inputs_ for efficient debugging](DeltaDebugger.ipynb)


## Background

_Cite relevant works in the literature and put them into context, as in:_

The idea of ensuring that each expansion in the grammar is used at least once goes back to Burkhardt \cite{Burkhardt1967}, to be later rediscovered by Paul Purdom \cite{Purdom1972}.