<a href="https://colab.research.google.com/github/mprocz/QAP-GeneticAlgorithm_MAB-UCB/blob/main/QAP_AG_MABUCB.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Code

## Imports


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import re
import numpy as np
import pandas as pd
from typing import List
import numpy.random as npr
from math import ceil, floor, sqrt, log
from random import randint, uniform, seed, sample, shuffle

# libs used in tests
import os
import json
import timeit
import seaborn as sns
from tqdm import tqdm
from scipy import stats
import matplotlib.pyplot as plt
from timeit import default_timer as timer

##Data

In [None]:
class Data():
    """
    This class stores the instance file data.

    :attribute instanceName: str
    :attribute size: int
    :flows: List[List[int]]
    :distances: List[List[int]] = []
    """

    instanceName: str = ""
    size: int = 0
    flows: List[List[int]] = []
    distances: List[List[int]] = []

def open_instance_file(filePath: str) -> List[List[int]]:
    """
    __author__: Felipe T. Peruci https://github.com/devLIPEr

    This function opens the instace data file available at (https://coral.ise.lehigh.edu/data-sets/qaplib/).
    """

    try:
        with open(filePath, 'r') as f:
            instance = [([int(a) for a in line.split()]) for line in f if len(line.split())]
            return instance
    except Exception as e:
        print("File not found")
        raise e

def get_file_data(filePath: str, name: str="") -> None:
    """
    __author__: Felipe T. Peruci https://github.com/devLIPEr

    This function separates each data contained in the opened file.
    """

    try:
        fileData = open_instance_file(filePath)

        Data.size = fileData.pop(0)[0]

        flowB = []
        while len(flowB) < (Data.size*Data.size):
            line = fileData.pop(0)
            flowB.extend(line)

        distB = []
        while len(distB) < (Data.size * Data.size):
            line = fileData.pop(0)
            distB.extend(line)

        Data.flows = [flowB[i : i + Data.size] for i in range(0, len(flowB), Data.size)]
        Data.distances = [distB[i : i + Data.size] for i in range(0, len(distB), Data.size)]
        Data.instanceName = name
    except Exception as exc:
        print("Error extracting data. \nError: ", type(exc), '\n', (exc), '\n')

## Result

In [None]:
def compare_result(filePath: str, best, printInfo: int=1) -> List:
    """
    __author__: Felipe T. Peruci https://github.com/devLIPEr

    Esta função compara os resultados obtidos na execução com o melhor resultado disponibilizado em (https://coral.ise.lehigh.edu/data-sets/qaplib/)

    :param best: Individual -> melhor individuo encontrado pelo algoritmo
    """

    try:
        with open(filePath) as f:
            result = [([int(a) for a in line.replace(',', ' ').split()]) for line in f]
        cost = result[0][1]

        if best.cost >= cost:
            error = best.cost-cost
            if printInfo == 1:
                if error == 0:
                    print('Best possible found.\n')
                else:
                    pass
                    #print(f'Solution is off by {error} from optimal solution.\n')
                return [error, cost]
        else:
            error = abs(best.cost-cost)
            pass
            #print(f'Solution is better by {error} from optimal solution.\n')
            return [-error, cost]
    except Exception as exc:
        print(f'File not found. \n', type(exc), '\n', (exc), 'n')
    return ['-1', '-1']

##Algorithm

In [None]:
# Deprecated
class Evaluations():
    """
    This class stores an evaluation counter.
    """

    evaluations: int = 0

In [None]:
class Individual():
    """
    This class represents a solution for the problem analyzed.

    :attribute chromosome: list
    :attribute cost: int
    :attribute generation: int
    """

    def __init__(self, chromosome: List = [], generation: int = 1, cost: int = None):
        if cost is None:
            self.chromosome: List[int] = self.generateChromosome(chromosome)
            self.cost: int = self.costCalculation()
            self.generation: int = generation
        else:
            self.chromosome: List[int] = []
            self.cost: int = cost
            self.generation: int = generation

    def __sub__(self, other):
        return self.cost-other.cost

    def __lt__(self, other):
        return self.cost < other.cost

    def __gt__(self, other):
        return self.cost > other.cost

    def generateChromosome(self, chromosome: List) -> List:
        """
        This method creates a random solution or fix an existent one.
        """

        if (len(chromosome) == Data.size):
            for i in range(Data.size):
                if chromosome.count(i) >= 2:
                    while(chromosome.count(i) != 1):
                        chromosome.remove(i)
            for i in range(Data.size):
                if i not in chromosome:
                    chromosome.append(i)
            return chromosome
        else:
            seed()
            newChromosome = sample(range(Data.size), Data.size)
            return newChromosome

    def costCalculation(self) -> int:
        """
        This method calcutes the solution cost.
        """

        cost = 0
        for i in range(Data.size):
            for j in range(Data.size):
                cost += ((Data.distances[i][j]) * (Data.flows[self.chromosome[i]][self.chromosome[j]]))
        Evaluations.evaluations += 1
        return cost

    def mutation(self) -> None:
        """
        This method mutates its own solution e recalculates its cost.
        """

        point1 = randint(0, Data.size-1)
        point2 = randint(0, Data.size-1)
        while point1 == point2:
            point2 = randint(0, Data.size-1)
        aux = self.chromosome[point1]
        self.chromosome[point1] = self.chromosome[point2]
        self.chromosome[point2] = aux
        self.cost = self.costCalculation()

In [None]:
class GeneticAlgorithm():
    """
    This class represents the Genetic Algorithm.

    :attribute nIndividualsPerGeneration: int
    :attribute nGenerations: int
    :attribute mutationRate: int
    :attribute elitismRate: float
    :attribute nChildren: int
    :attribute tournamentSize: int
    :attribute similarity: float
    :deprecated attribute maxEvaluations: int -> instance size multiplied by self.evaluationMultiplier
    :attribute operator: list -> name of all operators
    :attribute crossOperator: string -> current operator
    :attribute gen: int -> current generation
    :attribute best: list -> list that stores the best overall
    :attribute bestIndividualPerGeneration: list
    :attribute currentPopulation: list
    :attribute operatorTime: list -> time taken for each operator to execute
    :attribute allOperatorCounter: List -> list that stores how many times a operator is applied
    """

    def __init__(self,
                 nIndividualsPerGeneration: int = 100,
                 nGeneration: int = 50,
                 mutationRate: int = 5,
                 elitismRate: float = 0.01,
                 nChildrens: int = 1,
                 tournamentSize: int = 20,
                 similarityRate: float = 0.95,
                 evaluationMultiplier: int =100,
                 crossOperator: str = None):

        self.nIndividualsPerGeneration: int = nIndividualsPerGeneration
        self.nGenerations: int = nGeneration
        self.mutationRate: int = mutationRate
        self.elitismRate: float = elitismRate
        self.nChildren: int = nChildrens
        self.tournamentSize: int = tournamentSize
        self.similarity: float = similarityRate
        self.maxEvaluations: int = Data.size * evaluationMultiplier
        self.operator: List[str] = ['onePoint', 'CX', 'GSX', 'VR']
        self.crossOperator: str = crossOperator
        self.gen: int = 1
        self.best: List[Individual] = []
        self.bestIndividualPerGeneration: List[Individual] = []
        Evaluations.evaluations: int = 0
        self.currentPopulation: List[Individual] = []
        self.operatorTime: List[float] = []
        self.allOperatorCounter: List[List[int]] = []

        self.mab = MAB_UCB1(4, self.operator)
        #self.mab = MAB_UCB_TUNED(4, self.operator)

    def generatePopulation(self, population: List[Individual] = []) -> None:
        """
        This method generates a population of solutions.
        """

        while(len(self.currentPopulation) < self.nIndividualsPerGeneration):
            ind = Individual([], self.gen)
            self.currentPopulation.append(ind)

    def tournament(self) -> Individual:
        """
        This method implements the Tournament Selection Operator.
        """

        seed()
        pop = sample(self.currentPopulation[0:], self.tournamentSize)
        pop.sort(key = lambda x: x.cost)
        return (pop[0])

    def OLDsendReward(self, parents: List[Individual], offspring: Individual) -> None:
        """
        This method sends the reward of the current operator to the MAB algorithm.
        """
        if (offspring.cost <= parents[0].cost or offspring.cost <= parents[1].cost):
            self.mab.reward(True)
        else:
            self.mab.reward(False)

    def sendReward(self, parents, child, op):
        """
        This method sends the reward of the current operator to the MAB algorithm.

        __author__: Felipe T. Peruci https://github.com/devLIPEr
        """
        if op == "MPX":
            rewarded = False
            for parent in parents:
                if(not rewarded and child.cost <= parent.cost):
                    self.mab.reward((parent-child)/parent.cost if parent.cost != 0 else (parent-child))
                    # self.mab.reward((parent-child)/child.cost)
                rewarded = True
            if not rewarded:
                self.mab.reward((parents[-1]-child)/parents[-1].cost if parents[-1].cost != 0 else (parents[-1]-child))
                # self.mab.reward((parents[-1]-child)/child.cost)
        else:
            if(child.cost <= parents[0].cost or child.cost <= parents[1].cost):
                pIdx = 0 if child < parents[0] else 1
                self.mab.reward((parents[pIdx]-child)/parents[pIdx].cost if parents[pIdx].cost != 0 else (parents[pIdx]-child))
                # self.mab.reward((parents[pIdx]-child)/child.cost)
            else:
                self.mab.reward((parents[1]-child)/parents[1].cost if parents[1].cost != 0 else (parents[1]-child))
                # self.mab.reward((parents[1]-child)/child.cost)

    def crossover(self, parents: List[Individual], operator: str) -> Individual:
        """
        This method implements all Crossover Operators.
        """

        child = None
        if operator == 'CX':
            parent_one = parents[0]
            parent_two = parents[1]
            chrom_length = Data.size

            child_one = [-1 for i in range(chrom_length)]
            child_two = [-1 for i in range(chrom_length)]


            p1_copy = parent_one.chromosome.copy()
            p2_copy = parent_two.chromosome.copy()

            swap = True
            count = 0
            pos = 0

            while True:
                if count > chrom_length:
                    break
                for i in range(chrom_length):
                    if child_one[i] == -1:
                        pos = i
                        break
                if swap:
                    while True:
                        child_one[pos] = parent_one.chromosome[pos]
                        count += 1
                        pos = parent_two.chromosome.index(parent_one.chromosome[pos])
                        if p1_copy[pos] == -1:
                            swap = False
                            break
                        p1_copy[pos] = -1
                else:
                    while True:
                        child_one[pos] = parent_two.chromosome[pos]
                        count += 1
                        pos = parent_one.chromosome.index(parent_two.chromosome[pos])
                        if p2_copy[pos] == -1:
                            swap = True
                            break
                        p2_copy[pos] = -1
            for i in range(chrom_length):
                if child_one[i] == parent_one.chromosome[i]:
                    child_two[i] = parent_two.chromosome[i]
                else:
                    child_two[i] = parent_one.chromosome[i]

            for i in range(chrom_length):
                if child_one[i] == -1:
                    if p1_copy[i] == -1:
                        child_one[i] = parent_two.chromosome[i]
                    else:
                        child_one[i] = parent_one.chromosome[i]

            child = Individual(chromosome=child_one, generation=self.gen)
            #child2 = Individual(chromosome=child_two, generation=self.gen)


        elif operator == 'GSX':
            seed()
            pivot = randint(0, Data.size-1)

            #print(parents[0].chromosome)
            p1Index = parents[0].chromosome.index(pivot) - 1
            p2Index = parents[1].chromosome.index(pivot) + 1

            newChromosome = []
            newChromosome.append(pivot)
            p1 = True
            if p1Index < 0:
                p1 = False
            p2 = True
            if p2Index >= Data.size:
                p2 = False

            while p1 or p2:
                if p1:
                    if(parents[0].chromosome[p1Index] not in newChromosome):
                        newChromosome.insert(0, parents[0].chromosome[p1Index])
                        p1Index -= 1
                        if p1Index < 0:
                            p1 = False
                    else:
                        p1 = False

                if p2:
                    if(parents[1].chromosome[p2Index] not in newChromosome):
                        newChromosome.append(parents[1].chromosome[p2Index])
                        p2Index += 1

                        if p2Index >= Data.size:
                            p2 = False
                    else:
                        p2 = False

            res = []
            for i in range(Data.size):
                if i not in newChromosome:
                    res.append(i)
            shuffle(res)
            for i in res:
                newChromosome.append(i)

            child = Individual(chromosome=newChromosome, generation=self.gen)

        elif operator == 'VR':
            offspring = [-1 for i in range(Data.size)]
            for i in range(Data.size):
                if parents[0].chromosome[i] == parents[1].chromosome[i]:
                    offspring[i] = parents[0].chromosome[i]

            for i in range(Data.size):
                if i not in offspring:
                    seed()
                    pos = randint(0, Data.size-1)
                    while(offspring[pos] != -1):
                        seed()
                        pos = randint(0, Data.size-1)
                    offspring[pos] = i
            child = Individual(chromosome=offspring, generation=self.gen)

        elif operator == 'onePoint':
            p = randint(0, Data.size-1)
            newChromosome = parents[0].chromosome[:p] + parents[1].chromosome[p:]
            child = Individual(chromosome=newChromosome, generation=self.gen)

        self.sendReward(parents, child, operator)

        seed()
        if randint(1, 100) < self.mutationRate:
                child.mutation()

        return child

    def generationsLoop(self) -> None:
        """
        This method represents the main loop of the Genetic Algorithm
        """
        self.generatePopulation()
        self.currentPopulation.sort(key = lambda x: x.cost)
        self.best.append(self.currentPopulation[0])

        while (self.gen < self.nGenerations):
        #while (Evaluations.evaluations < self.maxEvaluations):
            operatorCounter = [0,0,0,0]

            self.gen += 1
            self.currentPopulation.sort(key = lambda x: x.cost)

            ### similarity verification
            similarity = 0
            for ind in self.currentPopulation:
                similarity += ind.cost
            similarity /= self.nIndividualsPerGeneration
            if similarity >= self.similarity:
                self.currentPopulation = self.currentPopulation[:ceil(self.nIndividualsPerGeneration/4)]
                self.generatePopulation(self.currentPopulation)

            ### elitism selection
            newPopulation = []
            for i in range(floor(self.nIndividualsPerGeneration*self.elitismRate)):
                newPopulation.append(self.currentPopulation[i])

            ### tournament selection and crossovers
            offs = len(newPopulation)
            while (offs < self.nIndividualsPerGeneration):
                #operatorRandomPosition = randint(0, 3)
                #self.crossOperator = self.operator[operatorRandomPosition]

                self.crossOperator = self.mab.pull()

                operatorCounter[self.operator.index(self.crossOperator)] += 1

                #print(self.crossOperator)

                ind1 = self.tournament()
                ind2 = self.tournament()
                for i in range(self.nChildren):
                    initialTime = timer()
                    offspring = self.crossover([ind1, ind2], self.crossOperator)
                    self.operatorTime.append(timer() - initialTime)
                    newPopulation.append(offspring)
                    offs +=  1

            newPopulation.sort(key = lambda x: x.cost)

            ### population size verification
            if (len(self.currentPopulation) > self.nIndividualsPerGeneration):
                self.currentPopulation = newPopulation[:self.nIndividualsPerGeneration]

            ### best individual verification
            if self.currentPopulation[0].cost < self.best[0].cost:
                self.best[0] = self.currentPopulation[0]

            self.allOperatorCounter.append(operatorCounter.copy())
            #print(self.currentPopulation[0].cost)

In [None]:
class MAB_UCB1():

    """
    This class implements MAB-UCB 1 Algorithm
    as a Hyper-Heuristic for a Genetic Algorithm

    :attribute numberOfOperators: int
    :attribute operators: List[str]
    :attribute trials: int
    :attribute armsPulled: List[int]
    :attribute cumulativeRewards: List[float]
    :attribute averageRewards: List[float]
    :attribute currentReward: List[Float]
    :attribute currentChosenArm: int
    """

    def __init__(self,
                 numbersOfOperators: int,
                 operators: List[str]):
        #constantes
        self.numbersOfOperators: int = numbersOfOperators
        self.operators: List[str] = operators
        self.rewardMult = 100
        #variáveis
        self.trials: int = 1
        self.armsPulled: List[int] = [0 for i in range(numbersOfOperators)]
        self.cumulativeRewards: List[int] = [0 for i in range(numbersOfOperators)]
        self.averageRewards: List[int] = [0 for i in range(numbersOfOperators)]
        self.currentReward: List[int] = [0 for i in range(numbersOfOperators)]
        self.currentChosenArm: int | None = None

    def updateValues(self) -> None:
        """
        This class updates attributes.
        """
        for i in range(self.numbersOfOperators):
            self.cumulativeRewards[i] += self.currentReward[i]
            self.averageRewards[i] = ( self.cumulativeRewards[i] / self.trials )


    def pull(self, returnStr: bool = True) -> int | str:
        """
        This class chooses an operator.
        """

        if self.currentChosenArm is not None:
            return "First, reward the last arm"

        for i in range(self.numbersOfOperators):
            if self.armsPulled[i] == 0:
                self.currentChosenArm = i
                self.armsPulled[i] += 1
                if returnStr: return self.operators[self.currentChosenArm]
                else: return self.currentReward.index(self.currentChosenArm)

        for i, operator in enumerate(self.operators):
            self.currentReward[i] = (self.averageRewards[i] + (sqrt((2 * log(self.trials)) / self.armsPulled[i])))

        self.currentChosenArm = self.currentReward.index(max(self.currentReward))
        #self.updateValues(self.currentChosenArm)

        self.trials += 1
        self.armsPulled[self.currentChosenArm] += 1

        if returnStr: return self.operators[self.currentChosenArm]
        else: return self.currentChosenArm

    def reward(self, rewardValue: int) -> None:
        """
        This class updates the rewards of the current chosen operator.
        """

        try:
            self.cumulativeRewards[self.currentChosenArm] += rewardValue * self.rewardMult
        except:
            print("First, pull an arm")
        self.updateValues()
        self.currentChosenArm = None
