## Imports 

In [1]:
import numpy as np
import os
import random
from random import choice 
from copy import deepcopy
import pickle

## Reading databases

In [2]:
def read_databases(filename):
    x1 = []
    x2 = []
    y = []
    for line in open(filename):
        csv_row = line.strip().split(',')
        csv_row = [float(number) for number in csv_row]
        x1.append(csv_row[0])
        x2.append(csv_row[1])
        y.append(csv_row[2])
    return x1, x2, y

In [3]:
filename = 'datasets/synth1/synth1-test.csv'
x1_train, x2_train, y_train = read_databases(filename)

In [4]:
filename = 'datasets/synth1/synth1-train.csv'
x1_test, x2_test, y_test = read_databases(filename)

## Defining individual class

In [5]:
expression = [['expression', 'op', 'expression'], 'variable', 'value']
operation = ['+', '-', '*']
variable = ['x1', 'x2']
value = ['random']

In [6]:
terminals_dict = dict()
terminals_dict['op'] = operation
terminals_dict['variable'] = variable
terminals_dict['value'] = value

In [7]:
class Node(object):
    def __init__(self, data='op', level=0):
        self.data = data
        self.type = data
        self.left_child = None
        self.right_child = None
        self.level = level
        self.max_level = 7
        self.terminal = False
        self._fitness = None

    def initial_node(self):
        self.add_left('expression')
        self.add_right('expression')
    
    def initialize(self):
        self.add_left('expression')
        self.add_right('expression')
        self.expand_children()
        self.get_parameters()

    def add_left(self, value):
        self.left_child = Node(value, level=self.level+1)
    
    def add_right(self, value):
        self.right_child = Node(value, level=self.level+1)
        
    def show(self):
        print(self.level, ": " , self.data)
        if self.left_child:
            self.left_child.show()
        
        if self.right_child:
            self.right_child.show()
    
    def get_expression(self):
        if self.data == 'expression':
            self.data = choice(expression)
            if type(self.data) == list:
                if self.level < self.max_level: 
                    self.add_left(self.data[0])
                    self.add_right(self.data[2])
                    self.data = self.data[1]
                else:
                    self.data = 'value'
                    
        if self.data != 'op':
            self.terminal = True
        else:
            self.terminal = False
                    
        if self.left_child is not None:
            self.left_child.get_expression()
        
        if self.right_child is not None:
            self.right_child.get_expression()
        
        self.type = self.data
    
    def expand_children(self):
        self.left_child.get_expression()
        self.right_child.get_expression()
    
    def get_parameters(self):
        self.data = choice(terminals_dict[self.data])
        if self.data == 'random':
            self.data = random.random()*5
        
        if self.left_child is not None:
            self.left_child.get_parameters()
        
        if self.right_child is not None:
            self.right_child.get_parameters()
    
    def evaluate(self, x1=0, x2=0):
        
        if self.terminal:
            if self.data == 'x1':
                return x1
            elif self.data == 'x2':
                return x2
            else:
                return self.data
            
        if self.left_child is not None:
            result_left = self.left_child.evaluate(x1, x2)
        if self.right_child is not None:
            result_right = self.right_child.evaluate(x1, x2)
        
        if self.data == '+':
            return result_left + result_right
        elif self.data == '-':
            return result_left - result_right
        elif self.data == '*':
            return result_left * result_right
    
    def error(self, x1, x2, y):
        return (y - self.evaluate(x1, x2))**2
    
    def fitness(self, x1_list, x2_list, y_list):
        
        if self._fitness == None:
            error_sum = 0
            for x1, x2, y in zip(x1_list, x2_list, y_list):
                error_sum += self.error(x1, x2, y)
            y_mean = y_list - np.mean(y_list)
            y_mean_sum = sum(y_mean**2)
            self._fitness = (error_sum/y_mean_sum)**0.5
            
        return self._fitness
    
    def change_node(self):
        self.data = choice(terminals_dict[self.type])
        if self.data == 'random':
            self.data = random.random()*5
    
    def mutate(self, m_level):
        if m_level == self.level:
            return self.change_node()
        elif np.random.rand() < 0.5 and self.left_child is not None:
            self.left_child.mutate(m_level)
        elif self.right_child is not None:
            self.right_child.mutate(m_level)
        else:
            return self.change_node()
    
    def subtree_mutate(self, m_level):
        if m_level == self.level:
            self.data = 'op'
            self.type = 'op'
            self.terminal = False
            self.add_left('expression')
            self.add_right('expression')
            self.get_expression()
            self.get_parameters()
            return self
        elif np.random.rand() < 0.5 and self.left_child is not None:
            self.left_child.subtree_mutate(m_level)
        elif self.right_child is not None:
            self.right_child.subtree_mutate(m_level)
        else:
            self.data = 'op'
            self.type = 'op'
            self.terminal = False
            self.add_left('expression')
            self.add_right('expression')
            self.get_expression()
            self.get_parameters()
            return self
    
    def subtree(self, directions_list):
        if len(directions_list) <= 1:
            if directions_list[0] == 'left':
                #return self.left_child
                return self.left_child if self.left_child else self
            elif directions_list[0] == 'right':
                return self.right_child if self.right_child else self
            else:
                raise ValueError('Please give a valid id for tree directions ["left"] or ["right"].', directions_list[0])
        elif directions_list[0] == 'left':
            return self.left_child.subtree(directions_list[1:]) if self.left_child else self
        elif directions_list[0] == 'right':
            return self.right_child.subtree(directions_list[1:]) if self.right_child else self
        else:
            raise ValueError('Please give a valid id for tree directions ["left"] or ["right"].', directions_list[0])
    
    def alter_subtree(self, directions_list, subtree):
        if self.left_child == None:
                self.data = subtree.data
                self.terminal = subtree.terminal
                self.left_child = subtree.left_child
                self.right_child = subtree.right_child
                self.fix_level(self.level)
                #print("Terminal")
        
        elif len(directions_list) == 1:
            if directions_list[0] == 'left':
                self.left_child = subtree
                self.left_child.fix_level(self.level+1)
            elif directions_list[0] == 'right':
                self.right_child = subtree
                self.right_child.fix_level(self.level+1)
            else:
                raise ValueError('Please give a valid id for tree directions ["left"] or ["right"].', directions_list[0])
        
        elif directions_list[0] == 'left':
            if self.left_child:
                self.left_child.alter_subtree(directions_list[1:], subtree)
            else:
                self.left_child = subtree
                self.left_child.fix_level(self.level+1)
        
        elif directions_list[0] == 'right':
            if self.right_child:
                self.right_child.alter_subtree(directions_list[1:], subtree)
            else:
                self.right_child = subtree
                self.right_child.fix_level(self.level+1)
        
        else:
            raise ValueError('Please give a valid id for tree directions ["left"] or ["right"].', directions_list[0])
    
    def gen_directions(self, level):
        directions = ['left', 'right']
        directions_list = []
        for i in range(level):
            directions_list.append(np.random.choice(directions))
        return directions_list
    
    def fix_level(self, level):
        self.level = level
        if self.left_child:
            self.left_child.fix_level(level+1)
        if self.right_child:
            self.right_child.fix_level(level+1)
        
    def crossover(self, parent, crossover_level1, crossover_level2):
        directions_list1 = self.gen_directions(crossover_level1)
        directions_list2 = self.gen_directions(crossover_level2)
        #print(directions_list1, directions_list2)
        subtree1 = deepcopy(self.subtree(directions_list1))
        subtree2 = deepcopy(parent.subtree(directions_list2))
        #subtree1.show()
        #subtree2.show()
        self.alter_subtree(directions_list1, subtree2)
        parent.alter_subtree(directions_list2, subtree1)
        self.fix_level(0)
        parent.fix_level(0)
    
    def tree_deepness(self):
        if self.left_child == None:
            return self.level
        
        else:
            deep = max(self.left_child.tree_deepness(), self.right_child.tree_deepness())
            return deep

In [8]:
root = Node()
root.initial_node()
root.expand_children()
root.show()

root.get_parameters()
print()
root.show()

root.evaluate()

0 :  op
1 :  value
1 :  variable

0 :  +
1 :  4.202216523128808
1 :  x2


4.202216523128808

In [9]:
root.evaluate(2,5)

9.202216523128808

In [10]:
root.error(2,5, 2)

51.87192284602961

## Generating initial population

In [11]:
class Genetic():
    def __init__(self, population_size, crossover_probability=0.9, x_data=None, y_data=None, initial_population=False):
        if crossover_probability < 0 or crossover_probability > 1 :
            raise ValueError('Please give a valid number for crossover probability.')
        self.max_level = 7
        self.population_size = population_size
        self.crossover_probability = crossover_probability
        self.mutation_probability = 1 - crossover_probability
        if initial_population:
            self.population = initial_population
        else:
            self.population = []
            for i in range(self.population_size):
                root = Node()
                root.initialize()
                self.population.append(root)
        self.x_values = x_data
        self.y_values = y_data
        self.sort_by_fitness()
    
    def sort_by_fitness(self):  #, x1_list=self.x_values[0], x2_list=self.x_valus[1], y_list=self.y_values):
        x1_list=self.x_values[0]
        x2_list=self.x_values[1]
        y_list=self.y_values
        self.population.sort(key = lambda x: x.fitness(x1_list, x2_list, y_list))
    
    def rank_selection_pair(self):
        ## Assign probabilities inversely proportional to the position index
        p = list(range(len(solution1.population))) + np.array(1)
        p = np.flipud(p)  # reverse array
        p = p/sum(p)
        parents = np.random.choice(self.population,size=2, p=p)
        p_fitness = [parents[0]._fitness, parents[1]._fitness]
        return (parents, p_fitness) #Return two individuals from population, and their fitness
    
    def tournament(self, k=2):
        selected_index = []
        parents = []
        p_fitness = []
        for i in range(2):
            index = np.random.choice(self.population_size,size=k)
            selected_index.append(max(index, key=lambda p: self.population[p]._fitness))
            parents.append(deepcopy(self.population[selected_index[i]]))
            p_fitness.append(self.population[selected_index[i]]._fitness)
        return (selected_index, parents, p_fitness)
    
    def generate_offspring(self):
        if np.random.rand() < self.crossover_probability:
            ## Crossover
            print("Crossover")
            return self.crossover()
        else:
            ## Mutation
            print("Mutation")
            return self.mutation()
    
    def iteration(self, elitism=False, elitist_operators=False):
        index, parents, p_fitness = self.tournament()
        better = 0
        for parent in parents:
            parent._fitness = None
        
        if np.random.rand() < self.crossover_probability:
            ## Crossover
            children = self.crossover(parents)
        else:
            ## Mutation
            children = self.mutation(parents)
        for child in children:
            child.fitness(*self.x_values, self.y_values)

        if elitist_operators:
            for i in range(2):
                if children[i]._fitness < p_fitness[i]:
                    self.population[index[i]] = children[i]
                    #print("Update!!")
                    better = 1
            self.sort_by_fitness()
        else:
            self.population.extend(children)
            self.sort_by_fitness()
            self.population = self.population[:self.population_size]
        
        best = self.population[0]._fitness
        mean_fitness = np.mean([self.population[i]._fitness for i in range(self.population_size)])
        worst = self.population[-1]._fitness
        
        #print("Best: ", best, end="", flush=True)
        #print("  Average: ", mean_fitness, end="", flush=True)
        #print("  Worst: ", worst, '\n')
        data = [best, mean_fitness, worst, better]
        return data
        
            
    def crossover(self, parents, elitism=False):
        ## Chosing mutation level with a pobability 
        children = [deepcopy(parents[0]), deepcopy(parents[1])]
        #print(children[0].show())
        #print(children[1].show())
        p = list(range(self.max_level)) + np.array(1)
        p = p/sum(p)
        
        crossover_level1 = np.random.choice(self.max_level, p=p) + 1  ## Avoid this number to be zero
        crossover_level2 = np.random.choice(self.max_level, p=p) + 1
        children[0].crossover(children[1], crossover_level1, crossover_level2)
        
        if children[0].tree_deepness() > children[0].max_level or children[1].tree_deepness() > children[1].max_level:
            return self.mutation(parents)
        else:
            return children
    
    def mutation(self, parents, elitism=False):
        children = [deepcopy(parents[0]), deepcopy(parents[1])]
        #print(children[0].show())
        #print(children[1].show())
        p = list(range(self.max_level)) + np.array(1)
        p = p/sum(p)
        for child in children:
            mutation_level = np.random.choice(self.max_level, p=p)
            child.subtree_mutate(mutation_level)
        return children
    
    def evolve(self, generations=100, elitist_operators=True, k=2):
        data = []  # Best, average, worst, 
        for i in range(generations):
            data.append(self.iteration(elitist_operators=elitist_operators))
        return data
    
    def predict(self, x_data, y_data):
        self.x_values = x_data
        self.y_values = y_data
        self.sort_by_fitness()
        return [self.population[i]._fitness for i in range(self.population_size)]

## Train - Synth 1 

In [12]:
##Parameters
population_size = 100
crossover_probability = 0.9
number_of_generations = 5000
k = 2

solution1 = Genetic(population_size, crossover_probability, [x1_train, x2_train], y_train)
initial_population = solution1.population

In [14]:
full_train_data = []
test_data = []
for population_size in [100, 500]:
    full_train_data = []
    test_data = []
    for i in range(30):
        print("Iteration ", i)
        solution1 = Genetic(population_size, crossover_probability, [x1_train, x2_train], y_train)
        data = solution1.evolve(number_of_generations, k)
        full_train_data.append(data)

        test_data.append(solution1.predict([x1_test, x2_test], y_test))
    
    filename = 'population_estimation_' + str(population_size) + '.pickle'
    with open(filename, 'wb') as f:
    # Pickle the 'data' dictionary using the highest protocol available.
        pickle.dump([full_train_data, test_data], f, pickle.HIGHEST_PROTOCOL)

Iteration  0
Iteration  1
Iteration  2
Iteration  3
Iteration  4
Iteration  0
Iteration  1
Iteration  2
Iteration  3
Iteration  4


In [18]:
population_size = 50
number_of_generations = 5000
full_train_data = []
test_data = []
solution1 = Genetic(population_size, crossover_probability, [x1_train, x2_train], y_train)
initial_population = solution1.population
for crossover_probability in [0.3, 0.5, 0.7, 0.9]:
    full_train_data = []
    test_data = []
    for i in range(30):
        print("Iteration ", i)
        solution1 = Genetic(population_size, crossover_probability, [x1_train, x2_train], y_train, initial_population=initial_population )
        data = solution1.evolve(number_of_generations, k)
        full_train_data.append(data)

        test_data.append(solution1.predict([x1_test, x2_test], y_test))
    
    filename = 'probability_estimation_' + str(crossover_probability) + '.pickle'
    with open(filename, 'wb') as f:
    # Pickle the 'data' dictionary using the highest protocol available.
        pickle.dump([full_train_data, test_data], f, pickle.HIGHEST_PROTOCOL)

Iteration  0
Iteration  1
Iteration  2
Iteration  3
Iteration  4
Iteration  0
Iteration  1
Iteration  2
Iteration  3
Iteration  4
Iteration  0
Iteration  1
Iteration  2
Iteration  3
Iteration  4
Iteration  0
Iteration  1
Iteration  2
Iteration  3
Iteration  4
