In [2]:
import torch
import numpy as np
import random
import copy

In [27]:
class EA(object):
    def __init__(self,  population_size, val_loader, loss_function, input_size, reservoir_size, n_labels):
        self.population_size = population_size
        self.val_loader = val_loader
        self.loss_function = loss_function
        self.input_size = input_size
        self.reservoir_size = reservoir_size
        self.output_size = n_labels

    def fitness(self, population, parents=None):
        
        # Copy paste the last results, so we don't have to calculate the loss and accuracy of an unchanged model. 
        if parents == True:
            for reservoir in population:
                reservoir['epoch'].append(reservoir['epoch'][-1]+1)
                reservoir['loss_results'].append(reservoir['loss_results'][-1])
                reservoir['accuracy_results'].append(reservoir['accuracy_results'][-1])
            
        else:
            # Evaluate the performance of every (mutated/recombinated) model in the population,
            # add the results to results list. 
            for reservoir in population:
                epoch, loss, total_accuracy = evaluation(self.val_loader, 
                                                         reservoir['model'], 
                                                         reservoir['epoch'][-1]+1, 
                                                         loss_function)
                reservoir['epoch'].append(epoch)
                reservoir['loss_results'].append(loss)
                reservoir['accuracy_results'].append(total_accuracy)

                # If we find a new best model, save it.
                # Still have to fine tune this , make a directory for all the models. 
                '''if loss < reservoir['best_loss']:
                    print('* Saving new best model *')
                    torch.save(reservoir['model'], 'trained_reservoir.model')
                    reservoir['best_loss'] = loss
                    reservoir['loss_iter'] = 0
                else:
                    reservoir['loss_iter'] += 1'''

        return population

    def mutation(self, pop, option, offspring_ratio, sample_dist):
        # Lets pick an offspring ratio of 3 to 1 parent
        
        if option == 'random_perturbation':
            mut_pop = self.random_perturbation(pop, sample_dist)
            print('Parent / child ratio = 1 : 3')
            
            for i in range(offspring_ratio - 1):
                mut_pop += self.random_perturbation(pop, sample_dist)
        
        elif option == 'diff_mutation':
            perturb_rate = 2
            mut_pop = self.diff_mutation(pop, perturb_rate)
            
            for i in range(offspring_ratio - 1):
                mut_pop += self.diff_mutation(pop, perturb_rate)
            
        
        return mut_pop 
    
    def diff_mutation(self, pop, perturb_rate):
        mut_pop = copy.deepcopy(pop)
        
        for reservoir in mut_pop:
            
            #Changed name to make more readable
            model = reservoir['model']
            
            # Randomly sample 2 models from the population & split them up
            sample = random.sample(pop, 2)
            sample1 = sample[0]['model']
            sample2 = sample[1]['model']
            
            # Perturb the weights
            model.W_in +=  perturb_rate * (sample1.W_in - sample2.W_in)
            model.W_r += perturb_rate * (sample1.W_r - sample2.W_r)
            model.U += perturb_rate * (sample1.U - sample2.U)
            temp_w_out = model.W_out + perturb_rate * (sample1.W_out - sample2.W_out)
            model.W_out = nn.Parameter(temp_w_out, requires_grad = False)
        
        return mut_pop
    
    def random_perturbation(self, pop, sample_dist):
        mut_pop = copy.deepcopy(pop)
        
        # Using a uniform distribution to sample from
        if sample_dist == 'uniform':
            W_in_sample = torch.empty(self.reservoir_size, self.input_size).uniform_(-0.05, 0.05)
            W_r_sample = torch.empty(self.reservoir_size, self.reservoir_size).uniform_(-0.05, 0.05)
            W_out_sample = torch.empty(self.output_size, self.reservoir_size).uniform_(-0.05, 0.05)
            U_sample = torch.empty(self.reservoir_size, self.input_size).uniform_(-0.05, 0.05)
        
        # Using a normal distribution to sample from
        elif sample_dist == 'gaussian':
            W_in_sample = torch.empty(self.reservoir_size, self.input_size).normal_(0, 0.05)
            W_r_sample = torch.empty(self.reservoir_size, self.reservoir_size).normal_(0, 0.05)
            W_out_sample = torch.empty(self.output_size, self.reservoir_size).normal_(0, 0.05)
            U_sample = torch.empty(self.reservoir_size, self.input_size).normal_(0, 0.05)
        
        for reservoir in mut_pop:
            reservoir['model'].W_in += W_in_sample
            reservoir['model'].W_r += W_r_sample
            reservoir['model'].U += U_sample
            
            # We have to turn off requires grad,
            # because pytorch does not allow inplace mutations on tensors which are used for backprop.
            # See https://discuss.pytorch.org/t/leaf-variable-was-used-in-an-inplace-operation/308/2 .
            reservoir['model'].W_out.requires_grad = False
            reservoir['model'].W_out += W_out_sample
        
        return mut_pop 
    
    def parent_offspring_selection(self, pop, recomb_pop, option):
        # Merge parents and childs
        total_pop = pop + recomb_pop
        
        # Select the top performing (lowest loss)
        if option == 'loss':
            total_pop = sorted(total_pop, key=lambda k: k['loss_results'][-1]) 
            new_pop = total_pop[:len(pop)]
            
        # Select the top performing (highest accuracy)
        elif option == 'accuracy':
            total_pop = sorted(total_pop, key=lambda k: k['accuracy_results'][-1], reverse=True) 
            new_pop = total_pop[:len(pop)]
            
        return new_pop
    
    def keep_best_selection(self, pop, offspring, option):
        
        # Set k best, best parent performers
        
        # In this case, we keep half of the parent population with the best fitness
        k_best = len(pop) // 2
            
        k_left = len(pop) - k_best
            
        # Select the top performing (highest accuracy)
        if option == 'accuracy':
            pop_sorted = sorted(pop, key=lambda k: k['accuracy_results'][-1], reverse=True) 
            new_pop = pop_sorted[:k_best]
            offspring_sorted = sorted(offspring, key=lambda k: k['accuracy_results'][-1], reverse=True)
            new_pop += offspring_sorted[:k_left]
        
        return new_pop
    
    def crossover(self, pop):
        
        # Using random crossover
        
        crossed_pop = copy.deepcopy(pop)
        
        W_in = []
        W_r = []
        U = []
        W_out = []
        
        
        # From parent population
        for reservoir in pop:
            W_in.append(reservoir['model'].W_in)
            W_r.append(reservoir['model'].W_r)
            U.append(reservoir['model'].U)
            W_out.append(reservoir['model'].W_out)
        
        # crossover
        for reservoir in crossed_pop:
            reservoir['model'].W_in = random.choice(W_in)
            reservoir['model'].U = random.choice(U)
            reservoir['model'].W_r = random.choice(W_r)
            reservoir['model'].W_out = random.choice(W_out)
        
        return crossed_pop
             
    def selection(self, pop, offspring, option, select_mech):
        
        # Parents + offspring selection
        if select_mech == 'merge_all':
            new_pop = self.parent_offspring_selection(pop, offspring, option)
        elif select_mech == 'keep_k_best':
            new_pop = self.keep_best_selection(pop, offspring, option)
        
        return new_pop
    
    def step(self, pop, mutate_opt, select_opt, select_mech, offspring_ratio, sample_dist):
        
        # Apply some mutation and recombination
        mut_pop = self.mutation(pop, mutate_opt, offspring_ratio, sample_dist)
        crossed_pop = self.crossover(mut_pop)
        
        # Merge (mutated pop) + (mutated AND crossed pop), so we have a larger pool to pick from. 
        merged_pop = mut_pop + crossed_pop
        
        # Get fitness from parents 
        pop = self.fitness(pop, parents=True)
        
        # Get fitness from childs
        print('Possible candidates for optimization')
        merged_pop = self.fitness(merged_pop, parents=False)
            
        # Survivor selection
        new_pop = self.selection(pop, merged_pop, select_opt, select_mech)
        
        return new_pop