Copyright **`(c)`** 2023 Ivan Magistro Contenta `<s314356@polito.it>`  
[`https://github.com/ivanmag22/computational-intelligence`](https://github.com/ivanmag22/computational-intelligence)

# LAB9

Write a local-search algorithm (eg. an EA) able to solve the *Problem* instances 1, 2, 5, and 10 on a 1000-loci genomes, using a minimum number of fitness calls. That's all.

### Deadlines:

* Submission: Sunday, December 3 ([CET](https://www.timeanddate.com/time/zones/cet))
* Reviews: Sunday, December 10 ([CET](https://www.timeanddate.com/time/zones/cet))

Notes:

* Reviews will be assigned  on Monday, December 4
* You need to commit in order to be selected as a reviewer (ie. better to commit an empty work than not to commit)

In [1010]:
import logging
from random import random, choice, choices, randint, shuffle, uniform
from functools import reduce
from collections import namedtuple
from dataclasses import dataclass
from copy import copy
import functools

from pprint import pprint

import numpy as np

import lab9_lib

In [1011]:
PROBLEM_SIZE = 1_000
POPULATION_SIZE = 110 # 50
OFFSPRING_SIZE = 70 # 60
N_INSTANCES = 1 # as stride to calculate the one-max
TOURNAMENT_SIZE = 14
MAX_GEN_SAME_FIT = 300

NUM_GENERATIONS = 100_000

In [1012]:
fitness = lab9_lib.make_problem(N_INSTANCES)
for n in range(POPULATION_SIZE):
    ind = choices([0, 1], k=PROBLEM_SIZE)
    print(f"{''.join(str(g) for g in ind)}: {fitness(ind):.2%}" if PROBLEM_SIZE<=10 else f"ind {n}: {fitness(ind):.2%}")

print(fitness.calls)

ind 0: 47.60%
ind 1: 48.30%
ind 2: 49.30%
ind 3: 49.60%
ind 4: 49.50%
ind 5: 48.20%
ind 6: 49.00%
ind 7: 50.00%
ind 8: 51.90%
ind 9: 51.10%
ind 10: 54.40%
ind 11: 48.80%
ind 12: 48.30%
ind 13: 50.50%
ind 14: 53.90%
ind 15: 50.40%
ind 16: 47.70%
ind 17: 50.70%
ind 18: 47.80%
ind 19: 52.70%
ind 20: 49.10%
ind 21: 49.60%
ind 22: 48.60%
ind 23: 49.60%
ind 24: 49.10%
ind 25: 51.20%
ind 26: 51.50%
ind 27: 52.00%
ind 28: 52.80%
ind 29: 50.50%
ind 30: 47.80%
ind 31: 49.50%
ind 32: 50.70%
ind 33: 51.60%
ind 34: 50.20%
ind 35: 50.90%
ind 36: 51.50%
ind 37: 49.00%
ind 38: 52.20%
ind 39: 49.90%
ind 40: 49.90%
ind 41: 51.80%
ind 42: 49.80%
ind 43: 51.10%
ind 44: 51.40%
ind 45: 47.90%
ind 46: 49.60%
ind 47: 50.40%
ind 48: 45.90%
ind 49: 49.70%
ind 50: 52.60%
ind 51: 50.80%
ind 52: 49.00%
ind 53: 49.50%
ind 54: 47.30%
ind 55: 50.90%
ind 56: 50.50%
ind 57: 50.20%
ind 58: 51.20%
ind 59: 49.80%
ind 60: 51.60%
ind 61: 49.80%
ind 62: 52.70%
ind 63: 51.90%
ind 64: 49.80%
ind 65: 49.30%
ind 66: 52.10%
ind 6

### Individual definition and parent selection

In [1013]:
Individual = namedtuple("Individual", ["genome", "fitness"])

def tournament(population, tournament_size=2):
    return max(choices(population, k=tournament_size), key=lambda i: i.fitness)

# the voting mechanism is a sort of roulette wheel.
def voting(population, tournament_size=None):

    total_weight = sum(x.fitness for x in population)

    #the sum has to be equal to one
    normalized_weights = [x.fitness / total_weight for x in population]

    shuffled_weights = [(index,w) for index, w in enumerate(normalized_weights)]
    shuffle(shuffled_weights)
    random_value = uniform(0, 1)

    current_weight_sum = 0
    for index, normalized_weight in shuffled_weights:
        current_weight_sum += normalized_weight
        if current_weight_sum >= random_value:
            return population[index]

### Mutation and Recombination mechanisms

In [1014]:
def one_cut(g1, g2):
    cut = randint(0, PROBLEM_SIZE)
    return g1[:cut] + g2[cut:]

def n_cut(g1, g2, n):
    cut = PROBLEM_SIZE//(n+1)

    o = ()
    g = g1
    for i in range(n+1):
        start = i*cut
        if i == n:
            end = PROBLEM_SIZE
        else:
            end = i*cut + cut
        o += g[start : end]
        if g == g1:
            g = g2
        else:
            g = g1

    return o

def uniform_xover(g1, g2):
    o = []
    for i in range(len(g1)):
        o.append(choice([g1[i],g2[i]]))
    
    return tuple(o)

def mutation(g):
    point = randint(0, PROBLEM_SIZE - 1)
    return g[:point] + (1 - g[point],) + g[point + 1 :]

## Initial Population

In [1015]:
population = list()

for genome in [tuple(choices([0, 1], k=PROBLEM_SIZE)) for _ in range(POPULATION_SIZE)]:
    population.append(Individual(genome, fitness(genome)))

logging.info(f"init: pop_size={len(population)}; max={max(population, key=lambda i: i.fitness)[1]}")

In [1016]:
NUM_NICHES = 1  # niches, islands are the same thing...
NUM_MIGRANTS = 10
RANDOM_NICHES = True
MIGRATION_RATE = 50

niches = list()
p = copy(population)
if RANDOM_NICHES:
    shuffle(p)
else:
    p.sort(key= lambda x: x.fitness)

for i in range(NUM_NICHES):
    niches.append(p[i*POPULATION_SIZE // NUM_NICHES : (i+1)*POPULATION_SIZE // NUM_NICHES])

# niches -> list of NUM_NICHES lists containing Individual objects

logging.info(f"init: pop_size={len(population)}; max={max(population, key=lambda i: i.fitness)[1]}")

## Evolution
### Segregation

È possibile implementare una sorta di self-adaptation in termini di miglioramenti rispetto alla generazione precedente per passare dall'exploration all'exploitation e viceversa.

Bisogna pensare a cosa fare con la popolazione:
- **steady-state GA**: da x genitori passo a y figli avendo una popolazione di x+y individui e poi torno a x individui per la generazione successiva
In a steady-state GA, only a subset of the population is replaced in each generation, typically a small percentage of the total population.
- **generational GA**: come un cambio generazionale, sostituisco i genitori con i figli in modo da avere sempre x individui
In a generational GA, the entire population is replaced in each generation. *Problem*: we could lose the best solution.
- **elitism**: Retains the best individuals from the current generation directly into the next generation without modification.

In [1017]:
def steady_state(population, offspring, size):
    """
    steady-state GA: from x parents we obtain y children and from x+y individuals we take only x ones
    """
    
    population += offspring
    population = sorted(population, key=lambda i: i.fitness, reverse=True)[:size]

    return population

def generational(population, offspring, size):
    """
    generational GA: the entire population is replaced in each generation
    """
    
    population = offspring
    population = sorted(population, key=lambda i: i.fitness, reverse=True)[:size]

    return population

def elitism(population, offspring, size):
    """
    elitism: it retains the best individuals from the current generation directly into the next generation without modification
    """
    
    population = sorted(population, key=lambda i: i.fitness, reverse=True)[:size//4]
    offspring = sorted(offspring, key=lambda i: i.fitness, reverse=True)[:size*3//4]

    return population + offspring

Qual è il giusto compromesso tra minor numero di chiamate a fitness e miglior individio (con miglior fitness)?
- minor numero di chiamate a fitness (la fitness viene chiamata quando si vuole applicare o quando si vuole ordinare la popolazione o il niche in base alla fitness)
    - bisogna avere pochi individui da valutare -> pochi individui e più generazioni
- migliore fitness:
    - bisogna avere individui diversi e mutarli/ricombinarli. Bisogna sapere quando applicare la exploration e quando la exploitation -> dipende dalla generazione.
    Si può fare il tutto mediante un confronto tra generazione precedente e generazione corrente in termini di miglioramento:
        - rapporto tra media delle fitness della generazione precedente e media delle fitness della generazione corrente
        - rapporto tra differenza tra fitness massima e fitness minima della generazione precedente e differenza tra fitness massima e fitness minima della generazione corrente
        - confronto tra il miglior individuo della generazione precedente ed il miglior individuo della generazione corrente

**Condizioni di terminazione**:
- Migliore soluzione trovata
- Numero totale di valutazione: la migliore per valutare un algoritmo
- Numero totale di step: utile in ambienti paralleli
- Wall-clock time: richiesto in pratica, contesti industriali
- Steady-state (a.k.a., give up if chance of improvement is low): leggermente utile, tipicamente usato ad altre condizioni

## Model that works on the whole population

In [1018]:
"""selection = steady_state
best_fitness = 0
counter = 0

for g in range(NUM_GENERATIONS):
    #print("Generation n.", g)
    tmp_fitness = 0
    
    offspring = list()
    for i in range(OFFSPRING_SIZE):
        if random() < 0.3:
            p = tournament(population)
            o = mutation(p.genome)
        else:
            p1 = tournament(population)
            p2 = tournament(population)
            o = one_cut(p1.genome, p2.genome)
        f = fitness(o)

        if f > tmp_fitness: # offsprings' fitness
            tmp_fitness = f
        offspring.append(Individual(o, f))
    population = selection(population, offspring, POPULATION_SIZE)
    
    if tmp_fitness <= best_fitness:
        counter += 1
        
        if counter >= 100:
            print("break at generation n.", g, "fitness: ", best_fitness)
            break
        
    elif tmp_fitness > best_fitness:
        counter = 0

        best_fitness = tmp_fitness
        if best_fitness == 1:
            print("break at generation n.", g, "fitness: ", best_fitness)
            break
    
    print("Generation",g ,"counter:", counter, "best_fitness:", best_fitness, "difference:",best_fitness-tmp_fitness)

population = sorted(population, key=lambda i: i.fitness, reverse=True)[:POPULATION_SIZE]
print(population[0])
print(fitness.calls)"""

'selection = steady_state\nbest_fitness = 0\ncounter = 0\n\nfor g in range(NUM_GENERATIONS):\n    #print("Generation n.", g)\n    tmp_fitness = 0\n    \n    offspring = list()\n    for i in range(OFFSPRING_SIZE):\n        if random() < 0.3:\n            p = tournament(population)\n            o = mutation(p.genome)\n        else:\n            p1 = tournament(population)\n            p2 = tournament(population)\n            o = one_cut(p1.genome, p2.genome)\n        f = fitness(o)\n\n        if f > tmp_fitness: # offsprings\' fitness\n            tmp_fitness = f\n        offspring.append(Individual(o, f))\n    population = selection(population, offspring, POPULATION_SIZE)\n    \n    if tmp_fitness <= best_fitness:\n        counter += 1\n        \n        if counter >= 100:\n            print("break at generation n.", g, "fitness: ", best_fitness)\n            break\n        \n    elif tmp_fitness > best_fitness:\n        counter = 0\n\n        best_fitness = tmp_fitness\n        if best

## Parallel methods
If you want to use one of this you should comment the code cell regarding the model with the whole population and the one regarding the parallel model that we are not interested in

### Island Model

In [1019]:
"""strategy = (generational, elitism)
index = 0
selection = strategy[index] # steady_state, generational, elitism"""
parent_selection = voting   # tournament
selection = elitism # steady_state, generational
best_fitness = 0
counter = 0
prv_fitness = [0 for _ in range(NUM_NICHES)]
tmp_fitness = [0 for _ in range(NUM_NICHES)]

for i,n in enumerate(niches):
    nich = copy(n)
    f = sorted(nich, key=lambda i: i.fitness, reverse=True)[0].fitness
    tmp_fitness[i] = f

for g in range(NUM_GENERATIONS):

    # migration
    if g % MIGRATION_RATE == 0 and g != 0 and NUM_NICHES > 1:
        migrants = [[] for _ in range(NUM_NICHES)]
        for i,n in enumerate(niches):
            for _ in range(NUM_MIGRANTS):
                migrants[i].append(n.pop())
            """
            c = 0
            for j in range(len(niches)):
                if i != j:
                    if c != (NUM_NICHES - 1) - 1:
                        niches[j] += migrants[c*NUM_MIGRANTS//(NUM_NICHES-1):(c+1)*NUM_MIGRANTS//(NUM_NICHES-1)]
                    else:
                        niches[j] += migrants[c*NUM_MIGRANTS//(NUM_NICHES-1):]
                c += 1
            """
        for i in range(NUM_NICHES):
            for j in range(NUM_NICHES):
                if i != j:
                    for _ in range(NUM_MIGRANTS//(NUM_NICHES-1)):
                        niches[i].append(migrants[j].pop())
        for i in range(len(migrants)):
            if migrants[i]:
                for _ in range(len(migrants[i])):
                    l = [len(x) for x in niches]
                    niches[l.index(min(l))].append(migrants[i].pop())
        
        tmp_fitness = [0 for _ in range(NUM_NICHES)]

        for i,n in enumerate(niches):
            nich = copy(n)
            f = sorted(nich, key=lambda i: i.fitness, reverse=True)[0].fitness
            tmp_fitness[i] = f
    
    # mutation and recombination
    for j,n in enumerate(niches):
        n_size = len(n)
        offspring = list()
        for i in range(OFFSPRING_SIZE):
            p1 = parent_selection(n, tournament_size=TOURNAMENT_SIZE)
            p2 = parent_selection(n, tournament_size=TOURNAMENT_SIZE)
            
            # mutation probability with self-adaptation
            if prv_fitness[j] * 11/10 >= tmp_fitness[j]:  # exploration
                r = random()
                if r > 0.65:
                    o = one_cut(p1.genome, p2.genome)
                elif r > 0.35 and r <= 0.65:
                    o = uniform_xover(p1.genome, p2.genome)
                else:
                    o = n_cut(p1.genome, p2.genome, randint(2, PROBLEM_SIZE*3//4))
                o = mutation(o)
            else:   # exploitation
                p = parent_selection(n, tournament_size=TOURNAMENT_SIZE)
                o = mutation(p.genome)
            f = fitness(o)
            prv_fitness[j] = tmp_fitness[j]
            
            """
            r = random()
            if r > 0.65:
                o = one_cut(p1.genome, p2.genome)
            elif r > 0.35 and r <= 0.65:
                o = uniform_xover(p1.genome, p2.genome)
            else:
                o = n_cut(p1.genome, p2.genome, randint(2, PROBLEM_SIZE*3//4))
            o = mutation(o)
            f = fitness(o)
            """

            if f > tmp_fitness[j]: # offsprings' fitness
                tmp_fitness[j] = f

            offspring.append(Individual(o, f))
        n = selection(n, offspring, n_size)
    
    m = max(tmp_fitness)
    if m <= best_fitness:
        counter += 1
        
        if counter >= MAX_GEN_SAME_FIT:
            print("break at generation n.", g, "fitness: ", best_fitness)
            break
        
    elif m > best_fitness:
        counter = 0

        best_fitness = m

    """index = 1 - index
    selection = strategy[index]"""
    
    print("Generation",g ,"counter:", counter, "best_fitness:", best_fitness, "fitness:", tmp_fitness)

population = [item for sublist in niches for item in sublist]
population = sorted(population, key=lambda i: i.fitness, reverse=True)[:POPULATION_SIZE]
print(population[0])
print(fitness.calls)

prv_fitness[j] * 11/10 >= tmp_fitness[j] False
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fit

prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitness[j] True
prv_fitness[j] * 11/10 >= tmp_fitn

### Segregation

In [1020]:
"""
num = NUM_NICHES
selection = elitism #steady_state
parent_selection = voting   # tournament
best_fitness = 0
counter = 0
tmp_fitness = [0 for _ in range(num)]

for i,n in enumerate(niches):
    nich = copy(n)
    f = sorted(nich, key=lambda i: i.fitness, reverse=True)[0].fitness
    tmp_fitness[i] = f

for g in range(NUM_GENERATIONS):

    if g % 300 == 0 and g != 0 and NUM_GENERATIONS - g >= 300 and num > 1:  # it puts together two niches if the number of niches is higher than 1
        fa = [] # list of fitness average
        for n in niches:    # for each niche in the list of niches
            fl = [x.fitness for x in n]
            fa.append(functools.reduce(lambda a, b: a+b, fl) / len(fl)) # average fitness for each niche
        l = list(enumerate(fa))
        l.sort(key = lambda x : x[:][1])    # x is a tuple: first index -> element in the list, second index -> element in the tuple
        combined_niche = niches[l[0][0]] + niches[l[1][0]]   # it takes only the first two elements (the ones with the lowest fitness average); list of Individual objects
        combined_niche = sorted(combined_niche, key=lambda i: i.fitness, reverse=True)[:len(niches[l[0][0]]) + len(niches[l[1][0]])]
        remaining_niches = [niches[i[0]] for i in l[2:]] if num>2 else []    # list of x lists containing 
        niches = [remaining_niches[0], combined_niche] if remaining_niches else [combined_niche]
        num -= 1
        
        tmp_fitness = [0 for _ in range(num)]

        for i,n in enumerate(niches):
            nich = copy(n)
            f = sorted(nich, key=lambda i: i.fitness, reverse=True)[0].fitness
            tmp_fitness[i] = f
    
    for j,n in enumerate(niches):
        n_size = len(n)
        offspring = list()
        for i in range(OFFSPRING_SIZE):
            p1 = parent_selection(n, tournament_size=TOURNAMENT_SIZE)
            p2 = parent_selection(n, tournament_size=TOURNAMENT_SIZE)
            if random() > 0.65:
                o = one_cut(p1.genome, p2.genome)
            else:
                o = n_cut(p1.genome, p2.genome, randint(2, PROBLEM_SIZE*3//4))
            o = mutation(o)
            f = fitness(o)

            if f > tmp_fitness[j]: # offsprings' fitness
                tmp_fitness[j] = f

            offspring.append(Individual(o, f))
        n = selection(n, offspring, n_size)
    
    m = max(tmp_fitness)
    if m <= best_fitness:
        counter += 1
        
        if counter >= 300:
            print("break at generation n.", g, "fitness: ", best_fitness)
            break
        
    elif m > best_fitness:
        counter = 0

        best_fitness = m

    print("Generation",g ,"counter:", counter, "best_fitness:", best_fitness, "fitness:", tmp_fitness)

population = [item for sublist in niches for item in sublist]
population = sorted(population, key=lambda i: i.fitness, reverse=True)[:POPULATION_SIZE]
print(population[0])
print(fitness.calls)
"""

'\nnum = NUM_NICHES\nselection = elitism #steady_state\nparent_selection = voting   # tournament\nbest_fitness = 0\ncounter = 0\ntmp_fitness = [0 for _ in range(num)]\n\nfor i,n in enumerate(niches):\n    nich = copy(n)\n    f = sorted(nich, key=lambda i: i.fitness, reverse=True)[0].fitness\n    tmp_fitness[i] = f\n\nfor g in range(NUM_GENERATIONS):\n\n    if g % 300 == 0 and g != 0 and NUM_GENERATIONS - g >= 300 and num > 1:  # it puts together two niches if the number of niches is higher than 1\n        fa = [] # list of fitness average\n        for n in niches:    # for each niche in the list of niches\n            fl = [x.fitness for x in n]\n            fa.append(functools.reduce(lambda a, b: a+b, fl) / len(fl)) # average fitness for each niche\n        l = list(enumerate(fa))\n        l.sort(key = lambda x : x[:][1])    # x is a tuple: first index -> element in the list, second index -> element in the tuple\n        combined_niche = niches[l[0][0]] + niches[l[1][0]]   # it takes 