## Hyper-params

In [1]:
POPULATION_SIZE = 100
MAX_DEPTH = 7;
TOURNAMENT_SIZE = 2
XOVER_APP_RATE = 80
MUTATION_APP_RATE = 20
GENERATIONS = 50
FITNESS_THRESHOLD = 0.01
APPLICATION_RATES = [XOVER_APP_RATE/MUTATION_APP_RATE, 1]
MAX_RANDOM = 5
MIN_RANDOM = 1
RUNS = 10
TRAINING_SET_SPLIT = 0.7 # out of 1

# Definition

## Imports and Boilerplate

In [2]:
import os
import subprocess
import random
from collections.abc import Callable
from collections.abc import Iterable
from functools import reduce
import pandas as pd
import numpy as np
import copy
import time
import math

In [3]:
%matplotlib inline

## Seeding random numbers

In [4]:
random.seed(257)

## Primitive, Terminal and Func classes

In [5]:
class Primitive():
    def __init__(self, symbol: str, arity: int = 0):
        self.symbol = symbol
        
    def print():
        print(self.symbol)

In [6]:
class Terminal(Primitive):
    def __init__(self, symbol: str, arity: int = 0):
        self.symbol = symbol

In [7]:
class RandomEphemeralConstant(Terminal):
    def __init__(self, symbol = 'R', arity: int = 0):
        self.symbol = random.uniform(MIN_RANDOM, MAX_RANDOM + 1)

In [8]:
class Func(Primitive):
    def __init__(self, symbol: str, arity: int, func: Callable[..., ...]):
        self.symbol = symbol
        self.arity = arity
        self.func = func
        
    def exec(self, *args):
        return self.func(*args)

## Node and Leaf classes

In [9]:
class Node():
    def __init__(self, data: Primitive = None, *children):
        self.children = list(children)
        self.data = data

    def add_children(self, *children):
        if self.full(): raise Exception('Node is full')
        self.children.extend(children)

    def set_data(self, data):
        self.data = data

    def max_children(self):
        return self.data.arity

    def full(self):
        return self.max_children() == self.degree()

    def depth(self):
        if not len(self.children): return 0
        
        return 1 + max(child.depth() for child in self.children)
        
    def degree(self):
        return len(self.children)

    def exec(self, case: Iterable):
        func = self.data
        args = [child.exec(case) for child in self.children]
        return func.exec(*args)

    def node_count(self):
        return 1 + sum (child.node_count() for child in self.children)

    '''
        0 is the root
    '''
    def get_node_at(self, pos, index = 0):
        if pos <= 0: return None
        for child in self.children:
            index = index + 1
            if index == pos:
                return child, index
            else:
                node, index = child.get_node_at(pos, index)
                if node: return node, index
                else: continue
        return None, index

    def set_node_at(self, pos, node, index = 0):
        if pos <= 0: return None
        for i, child in enumerate(self.children):
            index = index + 1
            if index == pos:
                prev_node = child
                self.children[i] = node
                return prev_node, index
            else:
                prev_node, index = child.set_node_at(pos, node, index)
                if prev_node: return prev_node, index
                else: continue
        return None, index

    def is_oversized(self, max_depth = MAX_DEPTH):
        return self.depth() > max_depth

    def prune(self, depth = 1, max_depth = MAX_DEPTH):
        if depth == max_depth:
            self.children = [Leaf(t) for t in random.choices(terminal_set, k = self.data.arity)]
            return

        for child in self.children:
            child.prune(depth + 1, max_depth)
    
    def print(self):
        print(self.to_pretty_str())
        
    def to_str(self, depth = 0):
        str = f"{self.data.symbol} ( "
        
        for i, child in enumerate(self.children):
            str += child.to_str(depth + 1)
            if i < self.degree() - 1: str += ', '
                
        str += f" )"

        return str
        
    def to_pretty_str(self, depth = 0):
        str = f"{self.data.symbol} ("
        
        for i, child in enumerate(self.children):
            str += f"\n{(depth + 1) * '\t'}"
            str += child.to_pretty_str(depth + 1)
            if i < self.degree() - 1: str += ','
                
        str += f"\n{depth * '\t'})"

        return str

In [10]:
class Leaf(Node):
    def __init__(self, data: Primitive):
        self.data = data

    def depth(self):
        return 0;

    def max_children(self):
        return 0;

    def full(self):
        return True;

    def exec(self, case: Iterable):
        terminal = self.data
        
        if isinstance(terminal, RandomEphemeralConstant): return terminal.symbol
            
        return case[terminal.symbol]

    def node_count(self):
        return 1

    def get_node_at(self, pos, pointer = 0):
        return None, pointer
        
    def set_node_at(self, pos, node: Node, pointer = 0):
        return None, pointer

    def prune(self, depth = 1, max_depth = MAX_DEPTH):
        return

    def print(self):
        print(self.to_pretty_str())
        
    def to_pretty_str(self, depth = 0):
        return self.data.symbol
        
    def to_str(self, depth = 0):
        return str(self.data.symbol)     

## Initial Population Generation

In [11]:
def grow(node: Node, depth: int = 1, max_depth = MAX_DEPTH):
    TERMINAL = 0;
    FUNCTION = 1;
    
    primitive = random.choice([0,1])

    if node.full(): return

    if(primitive == TERMINAL or depth >= max_depth):
        terminal = random.choice(terminal_set)
        
        child = None
        
        if isinstance(terminal, RandomEphemeralConstant): child = Leaf(RandomEphemeralConstant())
            
        else: child = Leaf(terminal)
            
        node.add_children(child)
        if not node.full():
            grow(node, depth, max_depth)
    
    if(primitive == FUNCTION and depth < max_depth):
        child = Node(random.choice(function_set))
        node.add_children(child)
        grow(child, depth + 1, max_depth)
        if not node.full():
            grow(node, depth, max_depth)

In [12]:
def is_duplicate(population: [str], indiv: Node) -> bool:
    for i in population:
        if i == indiv.to_str(): return True
    return False

In [13]:
def init_population(size = 50, method = 'grow') -> ([Node], []):
    population = []
    population_expressions = []
    
    for i in range(size):
        
        indiv = init_individual(method)
        
        while(is_duplicate(population_expressions, indiv)):
            indiv = init_individual(method)
            
        population.append(indiv)
        population_expressions.append(indiv.to_str())
            
    return population

In [14]:
def init_individual(method = 'grow', leaf_root: bool = False):
    roots = [ 
        Node(random.choice(function_set))
    ]

    if leaf_root: 
        terminal = random.choice(terminal_set);
        
        if isinstance(terminal, RandomEphemeralConstant): roots.append(Leaf(RandomEphemeralConstant()))
            
        else: roots.append(Leaf(terminal))

    individual = random.choice(roots)
    match method:
        case 'grow':
            grow(individual)
            return individual
        case _:
            pass

# Execution

## Fitness Cases (Dataset)

In [15]:
dataset_url = "192_vineyard.tsv"

dataset = pd.read_csv(dataset_url, sep = '\t')

# shuffle the dataset
dataset = dataset.sample(frac = 1, random_state = 123)

dataset.columns = ['x1', 'x2', 'y']

num_cases = dataset.shape[0]

num_train_cases = math.floor(TRAINING_SET_SPLIT * num_cases)


train_cases = dataset.iloc[:num_train_cases - 1, :]

test_cases = dataset.iloc[num_train_cases:, :]

In [16]:
train_cases.shape

(35, 3)

In [17]:
train_cases.head(3)

Unnamed: 0,x1,x2,y
48,0.5,8.0,11.0
35,3.0,10.0,17.0
30,3.0,8.0,15.0


## Primitive Sets

In [18]:
terminal_set = [Terminal('x1'), Terminal('x2'), RandomEphemeralConstant()]

In [19]:
def add(*args): return sum(args)

In [20]:
def minus(minuend, subtrahend): return minuend - subtrahend

In [21]:
def mul(*args): return reduce(lambda x, y: x * y, args)

In [22]:
def div(num, den): return 1 if den == 0 else num / den

In [23]:
function_set = [
    Func('+', 2, add), 
    Func('-', 2, minus), 
    Func('*', 2, mul), 
    Func('/', 2, div)]

## Fitness Calculation

In [24]:
def raw_fitness(prog: Node, cases: Iterable):
    fitness = 0;
    for i, case in cases.iterrows():
        fitness += abs(prog.exec(case) - case['y'])
    return fitness / len(cases);

In [25]:
def mean_std_of_objective_values(prog: Node, cases: Iterable):
    objective_values = [];
    for i, case in cases.iterrows():
        objective_values.append(prog.exec(case))
    objective_values = np.array(objective_values)
    return np.mean(objective_values), np.std(objective_values)

In [26]:
def fitness_stats(pop: [Node], cases: Iterable):
    best_fitness = raw_fitness(pop[0], cases)
    best_prog = pop[0]
    
    for prog in pop[1:]:
        fitness = raw_fitness(prog, cases)

        if fitness < best_fitness:
            best_fitness = fitness
            best_prog = prog
                
    return best_fitness, best_prog, *mean_std_of_objective_values(best_prog, test_cases)

## Selection

In [27]:
def tournament_select(population: [Node], cases, size = 2) -> Node:
    programs = random.choices(population, k = size)
    fitness = [0] * size
    for i, prog in enumerate(programs):
        fitness[i] = raw_fitness(prog, cases)
    
    selected_index = np.array(fitness, dtype=int).argsort()[0]
    return programs[selected_index]
        

## Genetic Operators

In [28]:
def crossover(parent1: Node, parent2: Node) -> [Node]:
    offspring1 = copy.deepcopy(parent1);
    offspring2 = copy.deepcopy(parent2);

    fragment1_pos = random.randrange(1, offspring1.node_count())
    fragment2_pos = random.randrange(1, offspring2.node_count())

    fragment1 = offspring1.get_node_at(fragment1_pos)[0]
    fragment2 = offspring2.get_node_at(fragment2_pos)[0]

    offspring2.set_node_at(fragment2_pos, fragment1)
    offspring1.set_node_at(fragment1_pos, fragment2)

    return [offspring1, offspring2]

In [29]:
def mutate(parent: Node) -> Node:
    mutant = copy.deepcopy(parent);
    
    point = random.randrange(1, mutant.node_count());

    fragment = init_individual(method='grow', leaf_root=True)

    mutant.set_node_at(point, fragment)

    return mutant

## Evolution

In [30]:
operators = ['crossover', 'mutation']

In [31]:
def gen_next_population(population: [Node]) -> [Node]:
    next_population = []

    while len(next_population) < POPULATION_SIZE:
        operator = random.choices(operators, weights=APPLICATION_RATES)
        
        match operator[0]:
            case 'crossover':
                parent1 = tournament_select(population, train_cases, size = TOURNAMENT_SIZE)
                parent2 = tournament_select(population, train_cases, size = TOURNAMENT_SIZE)

                offsprings = crossover(parent1, parent2)

                if(offsprings[0].is_oversized()): offsprings[0].prune()
                    
                if(offsprings[1].is_oversized()): offsprings[1].prune()
                    
                next_population.append(offsprings[0])

                if len(next_population) == POPULATION_SIZE : break
                    
                else: next_population.append(offsprings[1])
            case 'mutation':
                parent = tournament_select(population, train_cases, size = TOURNAMENT_SIZE)

                mutant = mutate(parent)

                if(mutant.is_oversized()): mutant.prune()

                next_population.append(mutant)
    return next_population
        

In [32]:
def run():
    seed = time.time()
    
    random.seed(seed)
    
    population = init_population()

    best_fitness, best_prog, mean, std = fitness_stats(population, train_cases)

    for i in range(GENERATIONS):
        population = gen_next_population(population);
        
        g_best_fitness, g_best_prog, g_mean, g_std = fitness_stats(population, train_cases)

        # print(f"Generation: {i}; |P|: {len(population)} Best fitness: {g_best_fitness}")
        # print(g_best_prog.to_str())
        # print()

        if g_best_fitness < best_fitness: 
            best_fitness = g_best_fitness
            best_prog = g_best_prog
            mean = g_mean
            std = g_std
        
        if best_fitness <= FITNESS_THRESHOLD:
            best_prog.print()
            break

    best_test_fitness = raw_fitness(best_prog, test_cases)
    
    print(f'''
            Seed: {seed};\n
            Best train fitness (MAS): {best_fitness};\n
            Best test fitness (MAS): {best_test_fitness}\n
            Mean: {mean}\n
            Standard deviation: {std}
        '''
    )
    print(best_prog.to_str())
    
    print()     

## Run

In [33]:
for i in range(RUNS):
    print(f"Run: {i} out of {RUNS}")
    run()

Run: 0 out of 10

            Seed: 1742510654.6653671;

            Best train fitness (MAS): 2.1200469884226494;

            Best test fitness (MAS): 2.4002055743490907

            Mean: 17.079605405207275

            Standard deviation: 4.330014253729426
        
+ ( + ( x2, + ( x1, 4.515035969091594 ) ), / ( / ( x1, x1 ), 3.5295848873272924 ) )

Run: 1 out of 10

            Seed: 1742510679.3093922;

            Best train fitness (MAS): 2.1013844142605316;

            Best test fitness (MAS): 2.230863624779654

            Mean: 17.173454499118616

            Standard deviation: 4.70870205045934
        
+ ( 5.048454499118617, * ( / ( + ( x1, x2 ), x1 ), x1 ) )

Run: 2 out of 10

            Seed: 1742510692.471257;

            Best train fitness (MAS): 2.1680196898491375;

            Best test fitness (MAS): 2.4621551632985677

            Mean: 16.58400869361146

            Standard deviation: 4.330014253729426
        
- ( + ( x2, + ( x1, 5.7471913092333065 ) ), 1.4444