# Generating Samples

In this notebook we will show how to generate valid samples for a given parser without using a grammar.

## Examples

First we import the convenience utilities.

In [None]:
import src.utils as utils

### Calculator.py

In [None]:
calculator = utils.load_file('subjects/calculator.py', 'calculator')

In [None]:
#%load subjects/calculator.py

## The error handler

We often need to interpret the error we get back. We use a simple exception class to capture the error.

In [None]:
with utils.ExpectError():
     calculator.main('xyz')

## A random fuzzer.

In [None]:
import random
random.seed(0)

In [None]:
import string

In [None]:
def fuzzer(max_length=100):
    string_length = random.randrange(1, max_length + 1)
    return ''.join([random.choice(string.printable) for c in range(string_length)])

In [None]:
fuzzer()

What happens if you feed this input to the program?

In [None]:
with utils.ExpectError():
    s = fuzzer()
    print(repr(s))
    calculator.main(s)

This is rather unsatisfying. We need a better way to reach deeper into the program. Let us observe the error again, this time with a plausible partial input.

In [None]:
with utils.ExpectError():
     calculator.main('(1+2)de')

As you can see, the exception we got precisely indicates exactly where the parse error occurred.

In [None]:
'(1+2)de'[0:5]

## Adding Feedback

Can we make use of the feedback from the fuzzer to construct better inputs?

In [None]:
import enum

In [None]:
class ExprStatus(enum.Enum):
    Complete = 0
    Unterminated = -1
    Unexpected = -2

In [None]:
class ExpectExprError:
    def __init__(self, s, log=False):
        self.boundary = None
        self.result = None
        self.s = s
        self.log = log

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, tb):
        if exc_type is None:
            self.boundary = 0
            self.result = ExprStatus.Complete
            return
        inp, self.boundary = exc_value.args
        if self.boundary >= len(self.s):
            self.result = ExprStatus.Unterminated
        elif self.boundary < len(self.s):
            self.result =  ExprStatus.Unexpected
        else:
            assert False
        return True

In [None]:
with ExpectExprError('(1+2x)') as e:
     calculator.main(e.s)
e.boundary, e.result

In [None]:
with ExpectExprError('(1+2') as e:
     calculator.main(e.s)
e.boundary, e.result

In [None]:
with ExpectExprError('(1+2)') as e:
     calculator.main(e.s)
e.boundary, e.result

In [None]:
with ExpectExprError('(1+2)x') as e:
     calculator.main(e.s)
e.boundary, e.result

## Building the Evolutionary Algorithm

In [None]:
def get_expr_fitness(s):
    with ExpectExprError(s) as e:
        calculator.main(e.s)
    match e.result:
        case ExprStatus.Complete:
            return 1.0/len(e.s)
        case ExprStatus.Unexpected:
            return len(e.s) - e.boundary
        case ExprStatus.Unterminated:
            return 1
    assert False, (s, e)

In [None]:
class Evolver:
    def __init__(self, fitness_fn=None, delta=0.1, log=True):
        self.fitness_fn = fitness_fn
        self.log = log
        self.delta = delta
        
    def get_fitness(self, s):
        return self.fitness_fn(s)

In [None]:
expr_evolver = Evolver(get_expr_fitness)

In [None]:
expr_evolver.get_fitness('(1+2)')

In [None]:
expr_evolver.get_fitness('(1+(2*4+4))')

In [None]:
expr_evolver.get_fitness('(1+2)234')

In [None]:
expr_evolver.get_fitness('(1+2+3')

In [None]:
expr_evolver.get_fitness('(1+2+3XXY')

In [None]:
class Evolver(Evolver):
    def create_population(self, size):
        return [fuzzer() for i in range(size)]

In [None]:
expr_evolver = Evolver(get_expr_fitness)

In [None]:
expr_evolver.create_population(10)

In [None]:
class Evolver(Evolver):
    def evaluate_population(self, population):
        fitness = [self.get_fitness(x) for x in population]
        return list(zip(population, fitness))

In [None]:
expr_evolver = Evolver(get_expr_fitness)

In [None]:
population = expr_evolver.create_population(100)
expr_evolver.evaluate_population(population)

In [None]:
class Evolver(Evolver):
    def selection(self, evaluated_population, tournament_size):
        competition = random.sample(evaluated_population, tournament_size)
        winner = min(competition, key=lambda individual: individual[1])[0]
        # Return a copy of the selected individual
        assert winner
        return winner[:]

In [None]:
class Evolver(Evolver):
    def crossover(self, parent1, parent2):
        assert parent1
        assert parent2
        pos = random.randint(1, len(parent1))

        offspring1 = parent1[:pos] + parent2[pos:]
        offspring2 = parent2[:pos] + parent1[pos:]
        assert offspring1
        assert offspring2

        return (offspring1, offspring2)

In [None]:
class Evolver(Evolver):
    def mutate(self, chromosome):
        assert chromosome
        mutated = chromosome[:]
        P = 1.0 / len(mutated)

        for pos in range(len(mutated)):
            if random.random() < P:
                new_c = chr(int(random.gauss(ord(mutated[pos]), 100) % 65536))
                mutated = mutated[:pos] + new_c + mutated[pos + 1:]
        return mutated

In [None]:
class Evolver(Evolver):
    def genetic_algorithm(self):
        generation = 0
        population = self.create_population(100)
        fitness = self.evaluate_population(population)
        best = min(fitness, key=lambda item: item[1])
        best_individual = best[0]
        best_fitness = best[1]
        if self.log:
            print("Best fitness of initial population: %s - %.10f" % (repr(best_individual), best_fitness))

        # Stop when optimum found, or we run out of patience
        while best_fitness > self.delta and generation < 1000:
            if self.log:
                print('.', best_fitness)
            # The next generation will have the same size as the current one
            new_population = []
            while len(new_population) < len(population):
                # Selection
                offspring1 = self.selection(fitness, 10)
                offspring2 = self.selection(fitness, 10)

                # Crossover
                if random.random() < 0.7:
                    (offspring1, offspring2) = self.crossover(offspring1, offspring2)

                # Mutation
                offspring1 = self.mutate(offspring1)
                offspring2 = self.mutate(offspring2)

                new_population.append(offspring1)
                new_population.append(offspring2)

            # Once full, the new population replaces the old one
            generation += 1
            population = new_population
            fitness = self.evaluate_population(population)

            best = min(fitness, key=lambda item: item[1])
            best_individual = best[0]
            best_fitness = best[1]
            if self.log:
                print("Best fitness at generation %d: %s - %.8f" % (generation, repr(best_individual), best_fitness))

        if self.log:
            print("Best individual: %s, fitness %.10f" %(repr(best_individual), best_fitness))
        return best_individual, best_fitness

In [None]:
expr_evolver = Evolver(get_expr_fitness)

In [None]:
expr_evolver.genetic_algorithm()

In [None]:
class ExprEvolver(Evolver):
    def get_fitness(self, s):
        with ExpectExprError(s, log=self.log) as e:
            calculator.main(e.s)
        match e.result:
            case ExprStatus.Complete:
                return 1.0/len(e.s)
            case ExprStatus.Unexpected:
                return len(e.s) - e.boundary
            case ExprStatus.Unterminated:
                return 1
        assert False, (s, e)

In [None]:
expr_evolver = ExprEvolver(log=False)

In [None]:
for i in range(10):
    v = expr_evolver.genetic_algorithm()
    print(repr(v))

### JSON
Generating JSON can be slow. (Only if we have enough time).

In [None]:
class JStatus(enum.Enum):
    Complete = 0
    Extra = 1
    Unterminated = -1
    Expecting = -2

In [None]:
import subjects.microjson as microjson

In [None]:
class ExpectJSONError:
    def __init__(self, s=None, log=False):
        self.msg = None
        self.boundary = None
        self.result = None
        self.s = s
        self.log = log

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, tb):
        if exc_type is None:
            self.boundary = 0
            self.result = JStatus.Complete
            return
        json_error = exc_value
        msg = str(exc_value)
        if self.log:
            print(msg, file=sys.stderr)
        if msg.startswith('extra data after JSON at position'):
            self.result = JStatus.Extra
        elif msg.startswith('malformed JSON data at position'):
            self.result = JStatus.Expecting
        elif msg.startswith('missing colon after key at position'):
            self.result = JStatus.Expecting
        elif msg.startswith('expected null at position'):
            self.result = JStatus.Expecting
        elif msg.startswith('expected boolean at position'):
            self.result = JStatus.Expecting
        elif msg.startswith('truncated JSON data at position'):                                                       
            self.result = JStatus.Unterminated
        else:
            # Not all exceptions have been specifically caught in the interest of simplicity.
            # assert False, msg
            self.result = JStatus.Expecting
        self.boundary = exc_value.pos
        return True

In [None]:
error_data = [
    #expected null at position 0, "'n%m\ri<Q8P<t{STo~V&iH|_pJu}8_*fB\r'"
    'n%m\ri<Q8P<t{STo~V&iH|_pJu}8_*fB\r',
    # expected boolean at position 0, "'tWI6n )AB/'"
    'tWI6n )AB/',
    # missing colon after key at position 36, "'fn1+"AC8fwp{@cQ'"
    'fn1+"AC8fwp{@cQ'
]

In [None]:
for x in error_data:
    with ExpectJSONError(x) as e:
        microjson.main(e.s)
    print(e.boundary, e.result)

In [None]:
with ExpectJSONError() as e:
     microjson.main('["abc"]de')
e.boundary, e.result

In [None]:
with ExpectJSONError() as e:
     microjson.main('["abc')
e.boundary, e.result

In [None]:
with ExpectJSONError() as e:
     microjson.main('[ab')
e.boundary, e.result

In [None]:
with ExpectJSONError() as e:
     microjson.main('[1,2,3]')
e.boundary, e.result

In [None]:
class JSONEvolver(Evolver):
    def get_fitness(self, s):
        with ExpectJSONError(s, self.log) as e:
            microjson.main(e.s)
        match e.result:
            case JStatus.Complete:
                return 1.0/len(e.s)
            case JStatus.Extra:
                return len(s) - e.boundary
                # better to be incomplete than incorrect.
                return len(s) * 0.1
            case JStatus.Expecting:
                if len(s) == e.boundary:
                    return 1
                return len(s) - e.boundary
            case JStatus.Unterminated:
                return 1
        assert False, (s, e)

In [None]:
json_evolver = JSONEvolver(log=False)

**Can be really slow**

In [None]:
for i in range(10):
    v = json_evolver.genetic_algorithm()
    print(repr(v))

# Done

In [None]:
#%tb