Copyright **`(c)`** 2023 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

# LAB9

Write a local-search algorithm (eg. an EA) able to solve the *Problem* instances 1, 2, 5, and 10 on a 1000-loci genomes, using a minimum number of fitness calls. That's all.

### Deadlines:

* Submission: Sunday, December 3 ([CET](https://www.timeanddate.com/time/zones/cet))
* Reviews: Sunday, December 10 ([CET](https://www.timeanddate.com/time/zones/cet))

Notes:

* Reviews will be assigned  on Monday, December 4
* You need to commit in order to be selected as a reviewer (ie. better to commit an empty work than not to commit)

In [1]:
from random import random, choice, randint, choices, sample, shuffle
from dataclasses import dataclass
from copy import copy
import numpy as np
import pandas as pd

import lab9_lib

### Individual class
Class used to store the fitness and the genotype of each individual.

In [2]:
@dataclass
class Individual:
    fitness: tuple
    genotype: list[bool]

### Crossover Methods
Set of crossover methods. Given two individuals: recombinate them to generate a new individual.

In [3]:
def one_cut_xover(ind1: Individual, ind2: Individual) -> Individual:
    assert len(ind1.genotype) == len(ind2.genotype)

    k = len(ind1.genotype)
    cut_point = randint(0, k-1)
    offspring = Individual(fitness=None, genotype=ind1.genotype[:cut_point] + ind2.genotype[cut_point:])
    assert len(offspring.genotype) == k
    
    return offspring

def n_cuts_xover(ind1: Individual, ind2: Individual, n: int = 2) -> Individual:
    assert len(ind1.genotype) == len(ind2.genotype)

    k = len(ind1.genotype)
    
    cut_points = sorted(sample(range(k), n))
    genotype = []
    prev_cut_point = 0
    for i in range(len(cut_points)):
        if i % 2 == 0:
            genotype += ind1.genotype[prev_cut_point:cut_points[i]]
        else:
            genotype += ind2.genotype[prev_cut_point:cut_points[i]]
        prev_cut_point = cut_points[i]

    if len(cut_points) % 2 == 0:
        genotype += ind1.genotype[prev_cut_point:]
    else:
        genotype += ind2.genotype[prev_cut_point:]

    
    offspring = Individual(fitness=None, genotype=genotype)
    assert len(offspring.genotype) == k

    return offspring

def uniform_xover(ind1: Individual, ind2: Individual) -> Individual:
        assert len(ind1.genotype) == len(ind2.genotype)

        k = len(ind1.genotype)
        genotype = []
        for i in range(k):
            if i % 2 == 0:
                genotype.append(ind1.genotype[i])
            else:
                genotype.append(ind2.genotype[i])

        offspring = Individual(fitness=None, genotype=genotype)
        assert len(offspring.genotype) == k

        return offspring

def my_xover(ind1: Individual, ind2: Individual) -> Individual:
        assert len(ind1.genotype) == len(ind2.genotype)

        k = len(ind1.genotype)
        cut_point = int((ind1.fitness / (ind1.fitness + ind2.fitness)) * k)
        offspring = Individual(fitness=None, genotype=ind1.genotype[:cut_point] + ind2.genotype[cut_point:])
        assert len(offspring.genotype) == k
        
        return offspring

In [4]:
xovers = [one_cut_xover, n_cuts_xover, uniform_xover, my_xover]

### Mutation Methods

In [5]:
def single_gene_mutate(ind: Individual) -> Individual:
    k = len(ind.genotype)
    offspring = copy(ind)
    pos = randint(0, k-1)
    offspring.genotype[pos] = not offspring.genotype[pos]
    offspring.fitness = None
    
    return offspring

def variable_strength_mutate(ind: Individual) -> Individual:
    k = len(ind.genotype)
    offspring = copy(ind)
    strength = 0.5

    while(random() < strength):
        pos = randint(0, k-1)
        offspring.genotype[pos] = not offspring.genotype[pos]
        offspring.fitness = None

    return offspring

def swap_mutate(ind: Individual) -> Individual:
    k = len(ind.genotype)
    offspring = copy(ind)
    pos1, pos2 = sample(range(k), 2)
    tmp = offspring.genotype[pos1]
    offspring.genotype[pos1] = offspring.genotype[pos2]
    offspring.genotype[pos2] = tmp

    return offspring

def inversion_mutate(ind: Individual) -> Individual:
    k = len(ind.genotype)
    offspring = copy(ind)
    pos1, pos2 = sorted(sample(range(k), 2))
    offspring.genotype[pos1:pos2] = [1 - value for value in offspring.genotype[pos1:pos2]]

    return offspring

In [6]:
mutates = [single_gene_mutate, variable_strength_mutate, swap_mutate, inversion_mutate]

### Parent Selection Methods

In [7]:
def tournament_selection(population: list[Individual], tournament_size: int = 2) -> Individual:
    pool = [choice(population) for _ in range(tournament_size)]
    champion = max(pool, key=lambda i: i.fitness)
    return champion

### Survival selection methods

In [8]:
def steady_state(population: list[Individual], offsprings: list[Individual]) -> Individual:
    population_size = len(population)
    population.extend(offsprings)
    population.sort(key=lambda i: i.fitness, reverse=True)
    population = population[:population_size]

    return population

### Termination Conditions

In [9]:
def terminate(ind: Individual) -> bool:
    return ind.fitness == 1.0

### Population Initialization 

In [10]:
def initialize_population(k, population_size, fitness) -> list[Individual]:
    population = [
    Individual(
        genotype=[choice([0, 1]) for _ in range(k)],
        fitness=None,
    )
    for _ in range(population_size)
    ]

    for i in population:
        i.fitness = fitness(i.genotype)

    return population

### Migration Methods

In [11]:
def random_migrate(population1: list[Individual], population2: list[Individual], migrants: int = 2) -> tuple[list[Individual], list[Individual]]:
    l1 = len(population1)
    l2 = len(population2)

    migrants_population1 = sample(range(l1), migrants)
    migrants_population2 = sample(range(l2), migrants)

    for m1, m2 in zip(migrants_population1, migrants_population2):
        population2[m1], population1[m2] = population1[m1], population2[m2]

    return (population1, population2)

In [12]:
migrates = [random_migrate]

### Self-Adaptive mutation probability

In [13]:
from scipy.spatial.distance import cityblock

def average_manhattan_distance(ls: list[list[int]]):
    total_distance = 0
    counter = 0

    for i in range(len(ls)):
        for j in range(i+1, len(ls)):
            total_distance += cityblock(ls[i], ls[j])
            counter += 1

    return (total_distance / counter) / len(ls[0])

def self_adapt(population: list[Individual], mutation_probability: float):
    new_mutation_probability = mutation_probability
    genotypes = [ind.genotype for ind in population]

    if average_manhattan_distance(genotypes) < 1 / 10:
        new_mutation_probability *= 1.1
    else:
        new_mutation_probability /= 1.1

    return new_mutation_probability

### Evolution Algorithm

In [14]:
def evolution_algorithm(population: list[Individual], num_generations: int, mutation_probability: int,
                        offspring_size: int, mutate: callable, xover: callable, fitness: callable,
                        parent_selection: callable, survival_selection: callable, terminate: callable,
                        self_adaptive_prob=False, verbose=False) -> tuple[float, int]:
        fitness_calls = 0
        counter = 0

        for generation in range(num_generations):
            old_best = population[0].fitness
            offspring = list()
            for _ in range(offspring_size):
                if random() < mutation_probability:
                    p = parent_selection(population)
                    o = mutate(p)
                else:
                    p1 = parent_selection(population)
                    p2 = parent_selection(population)
                    o = xover(p1, p2)
                offspring.append(o)

            for i in offspring:
                i.fitness = fitness(i.genotype)
                fitness_calls += 1

            population = survival_selection(population, offspring)

            if verbose:
                print(f"Generation: {generation+1} \t Best individual fitness: {population[0].fitness} \t mutation probability: {mutation_probability}")

            if self_adaptive_prob:
                if generation % 100 == 0 and generation != 0:
                    mutation_probability = self_adapt(population, mutation_probability)

            if terminate(population[0]):
                break

            if old_best == population[0].fitness:
                 counter += 1
                 if counter > 100:
                    break
            else:
                 counter = 0
 
        return (population[0].fitness, fitness_calls)

### Island model

In [15]:
def island_model(population: list[Individual], num_generations: int, mutation_probability: int,
                offspring_size: int, mutate: callable, xover: callable, fitness: callable,
                parent_selection: callable, survival_selection: callable, terminate: callable,
                n_sub_populations: int, migrate: callable, migration_rate: int, self_adaptive_prob=False, 
                verbose=False) -> tuple[float, int]:
        fitness_calls = 0
        best_fitness = 0
        sub_populations = [population[i::n_sub_populations] for i in range(n_sub_populations)]

        for generation in range(num_generations):
            old_best = best_fitness

            for j, sub_population in enumerate(sub_populations):
                offspring = list()
                for _ in range(offspring_size):
                    if random() < mutation_probability:
                        p = parent_selection(sub_population)
                        o = mutate(p)
                    else:
                        p1 = parent_selection(sub_population)
                        p2 = parent_selection(sub_population)
                        o = xover(p1, p2)
                    offspring.append(o)

                for i in offspring:
                    i.fitness = fitness(i.genotype)
                    fitness_calls += 1
                
                sub_populations[j] = survival_selection(sub_population, offspring)

            if generation % migration_rate == 0 and generation != 0:
                shuffle(sub_populations)
                for ind in range(0, len(sub_populations), 2):
                    sub_populations[ind], sub_populations[ind+1] = migrate(sub_populations[ind], sub_populations[ind+1])

            best_fitness = max(sub_population[0].fitness for sub_population in sub_populations)
            if verbose:
                print(f"Generation: {generation+1} \t Best individual fitness: {best_fitness} \t mutation probability: {mutation_probability}")

            if self_adaptive_prob:
                if generation % 100 == 0 and generation != 0:
                    mutation_probability = self_adapt(population, mutation_probability)

            if terminate(population[0]):
                break

            if old_best == best_fitness:
                 counter += 1
                 if counter > 100:
                    break
            else:
                 counter = 0

        best_fitness = max(sub_population[0].fitness for sub_population in sub_populations)
        return (best_fitness, fitness_calls)

# Problem Solution

### a) Testing the Evolution algorithm *without* promoting diversity

Setting parameters'values

In [16]:
NUM_GENERATIONS = 10000
K = 1000
POPULATION_SIZE = 50
OFFSPRING_SIZE = 33
MUTATION_PROBABILITY = 0.15

Example of EA *without* self-adapting mutation probability

In [17]:
fitness = lab9_lib.make_problem(1)
population = initialize_population(K, POPULATION_SIZE, fitness)

best_fitness, fitness_calls = evolution_algorithm(population, NUM_GENERATIONS, MUTATION_PROBABILITY, OFFSPRING_SIZE,
                                                single_gene_mutate, one_cut_xover, fitness,
                                                tournament_selection, steady_state, terminate,
                                                verbose=False, self_adaptive_prob=False)

best_fitness, fitness_calls

(0.96, 34617)

Example of EA *with* self-adapting mutation probability

In [18]:
fitness = lab9_lib.make_problem(1)
population = initialize_population(K, POPULATION_SIZE, fitness)

best_fitness, fitness_calls = evolution_algorithm(population, NUM_GENERATIONS, MUTATION_PROBABILITY, OFFSPRING_SIZE,
                                                single_gene_mutate, one_cut_xover, fitness,
                                                tournament_selection, steady_state, terminate,
                                                verbose=False, self_adaptive_prob=True)

best_fitness, fitness_calls

(0.957, 33297)

Test all xover / mutation methods implemented and store the result in a Pandas dataframe

In [19]:
columns = ['fitness function', 'xover', 'mutate', 'self adapting probability', 'best fitness value', 'fitness calls']
df = pd.DataFrame(columns=columns)

In [20]:
counter = 0
fitness_numbers = [1, 2, 5, 10]
self_adaptive_options = [False, True]

for fitness_number in fitness_numbers:
    fitness = lab9_lib.make_problem(fitness_number)
    # use the same initial population for each combination of xover and mutation
    population = initialize_population(K, POPULATION_SIZE, fitness)
    for xover in xovers:
        for mutate in mutates:
                for self_adaptive_prob in self_adaptive_options:

                    best_fitness, fitness_calls = evolution_algorithm(
                                                            population, 
                                                            NUM_GENERATIONS, MUTATION_PROBABILITY, OFFSPRING_SIZE,
                                                            mutate, xover, fitness,
                                                            tournament_selection, steady_state, terminate,
                                                            verbose=False, self_adaptive_prob=self_adaptive_prob
                                                            )
                    
                    # store the result in the dataframe
                    df.loc[counter] = [
                         fitness_number,
                         xover.__name__,
                         mutate.__name__,
                         self_adaptive_prob,
                         str(round(best_fitness*100, 2))+"%",
                         fitness_calls
                         ]
                    counter += 1

Print the result

In [21]:
from tabulate import tabulate

print(tabulate(df, headers = 'keys', tablefmt = 'psql'))

+-----+--------------------+---------------+--------------------------+-----------------------------+----------------------+-----------------+
|     |   fitness function | xover         | mutate                   | self adapting probability   | best fitness value   |   fitness calls |
|-----+--------------------+---------------+--------------------------+-----------------------------+----------------------+-----------------|
|   0 |                  1 | one_cut_xover | single_gene_mutate       | False                       | 97.2%                |           40953 |
|   1 |                  1 | one_cut_xover | single_gene_mutate       | True                        | 96.7%                |           42867 |
|   2 |                  1 | one_cut_xover | variable_strength_mutate | False                       | 98.5%                |           77649 |
|   3 |                  1 | one_cut_xover | variable_strength_mutate | True                        | 93.1%                |           52239 |

### b) Testing the Evolution algorithm promoting diversity: the Island Model

Setting parameters'values

In [22]:
NUM_GENERATIONS = 10000
K = 1000
POPULATION_SIZE = 80
OFFSPRING_SIZE = 12
MUTATION_PROBABILITY = 0.15

MIGRATION_RATE = 100
N_SUB_POPULATIONS = 4

Example of Island Model *without* self-adapting mutation probability

In [23]:
fitness = lab9_lib.make_problem(1)
population = initialize_population(K, POPULATION_SIZE, fitness)

best_fitness, fitness_calls = island_model(population, NUM_GENERATIONS, MUTATION_PROBABILITY, OFFSPRING_SIZE,
                                                single_gene_mutate, one_cut_xover, fitness,
                                                tournament_selection, steady_state, terminate,
                                                N_SUB_POPULATIONS, random_migrate, MIGRATION_RATE,
                                                verbose=False, self_adaptive_prob=False)

best_fitness, fitness_calls

(0.98, 100080)

Example of Island Model *with* self-adapting mutation probability

In [24]:
fitness = lab9_lib.make_problem(1)
population = initialize_population(K, POPULATION_SIZE, fitness)

best_fitness, fitness_calls = island_model(population, NUM_GENERATIONS, MUTATION_PROBABILITY, OFFSPRING_SIZE,
                                                single_gene_mutate, one_cut_xover, fitness,
                                                tournament_selection, steady_state, terminate,
                                                N_SUB_POPULATIONS, random_migrate, MIGRATION_RATE,
                                                verbose=False, self_adaptive_prob=True)

best_fitness, fitness_calls

(0.965, 141456)

Test all xover / mutation methods implemented and store the result in a Pandas dataframe

In [25]:
columns = ['fitness function', 'xover', 'mutate', 'migrate', 'self adapting probability', 'best fitness value', 'fitness calls']
df2 = pd.DataFrame(columns=columns)

In [26]:
counter = 0
fitness_numbers = [1, 2, 5, 10]
self_adaptive_options = [False, True]

for fitness_number in fitness_numbers:
    fitness = lab9_lib.make_problem(fitness_number)
    # use the same initial population for each combination of xover and mutation
    population = initialize_population(K, POPULATION_SIZE, fitness)
    for xover in xovers:
        for mutate in mutates:
                for migrate in migrates:
                    for self_adaptive_prob in self_adaptive_options:

                        best_fitness, fitness_calls = island_model(
                                                                population, 
                                                                NUM_GENERATIONS, MUTATION_PROBABILITY, OFFSPRING_SIZE,
                                                                mutate, xover, fitness,
                                                                tournament_selection, steady_state, terminate,
                                                                N_SUB_POPULATIONS, random_migrate, MIGRATION_RATE,
                                                                verbose=False, self_adaptive_prob=self_adaptive_prob
                                                                )
                        
                        # store the result in the dataframe
                        df2.loc[counter] = [
                            fitness_number,
                            xover.__name__,
                            mutate.__name__,
                            migrate.__name__,
                            self_adaptive_prob,
                            str(round(best_fitness*100, 2))+"%",
                            fitness_calls
                            ]
                        counter += 1

Print the result

In [28]:
from tabulate import tabulate

print(tabulate(df2, headers = 'keys', tablefmt = 'psql'))

+-----+--------------------+---------------+--------------------------+----------------+-----------------------------+----------------------+-----------------+
|     |   fitness function | xover         | mutate                   | migrate        | self adapting probability   | best fitness value   |   fitness calls |
|-----+--------------------+---------------+--------------------------+----------------+-----------------------------+----------------------+-----------------|
|   0 |                  1 | one_cut_xover | single_gene_mutate       | random_migrate | False                       | 95.8%                |           82992 |
|   1 |                  1 | one_cut_xover | single_gene_mutate       | random_migrate | True                        | 96.7%                |          134736 |
|   2 |                  1 | one_cut_xover | variable_strength_mutate | random_migrate | False                       | 93.1%                |           84720 |
|   3 |                  1 | one_cut_xov