In [1]:
import time
import pandas as pd
import numpy as np
import random as rand
import copy

In [2]:
class FactoryQueue:
    def __init__(self,
                 genotype: dict,
                 fenotype: int,
                 list_of_processes: list,
                 num_cross_places: int,
                 num_mut_places: int,
                 mut_chance: float):

        self.genotype = genotype
        self.fenotype = fenotype
        self.list_of_processes = list_of_processes
        self.num_cross_places = num_cross_places
        self.num_mut_places = num_mut_places
        self.mut_chance = mut_chance

    def __str__(self):
        return "Factory Queue, order of processes: " + str(self.genotype) + "\nTheir cost: " + str(self.fenotype)

    def __repr__(self):
        return "Factory Queue, order of processes: " + str(self.genotype) + ", \n their cost: " + str(self.fenotype)

    def __add__(self, other):
        split_places = []
        list_of_processes_A = []
        list_of_processes_B = []
        genotype_for_A = {}
        genotype_for_B = {}

        for _ in range(self.num_cross_places):
            split_places.append(rand.randint(1, len(self.list_of_processes) - 2))
        split_places.sort()

        for cross_place in range(self.num_cross_places):
            list_of_processes_A += [x for x in self.list_of_processes[:split_places[cross_place]] if x not in list_of_processes_A]
                                    #(self.list_of_processes[:split_places[cross_place]])
            list_of_processes_A += [x for x in other.list_of_processes[split_places[cross_place]:] if x not in list_of_processes_A]
            list_of_processes_B += [x for x in other.list_of_processes[:split_places[cross_place]] if x not in list_of_processes_B]
                                    #(other.list_of_processes[:split_places[cross_place]])
            list_of_processes_B += [x for x in self.list_of_processes[split_places[cross_place]:] if x not in list_of_processes_B]

        list_of_processes_A += [x for x in other.list_of_processes if x not in list_of_processes_A]
        list_of_processes_B += [x for x in self.list_of_processes if x not in list_of_processes_B]

        for process in list_of_processes_A:
            genotype_for_A[process] = sorted_setup[process]

        for process in list_of_processes_B:
            genotype_for_B[process] = sorted_setup[process]

        return FactoryQueue(genotype=genotype_for_A,
                            fenotype=0,
                            list_of_processes=list_of_processes_A,
                            num_cross_places=num_cross_places,
                            num_mut_places=num_mut_places,
                            mut_chance=mut_chance), \
               FactoryQueue(genotype=genotype_for_B,
                            fenotype=0,
                            list_of_processes=list_of_processes_B,
                            num_cross_places=num_cross_places,
                            num_mut_places=num_mut_places,
                            mut_chance=mut_chance)

    def calculate_time(self):
        working_setup = copy.deepcopy(self.genotype)
        current_time = 0
        current_machines_working = set()
        processes_running = dict()
        processes_done = set()
        processes_running_loop = dict()

        while len(processes_done) != len(working_setup):
            # if process is not running and its possible for it to run, then add its machine and process to proper sets
            for process in working_setup.keys():

                list_of_machines = working_setup[process]
                for machine in working_setup[process]:

                    machine_index = list_of_machines.index(machine)
                    if machine_index == 0:
                        previous_machine_count_number = 0
                    elif machine_index != 0:
                        previous_machine_count_number = list_of_machines[machine_index - 1][1]

                    if previous_machine_count_number != 0:
                        break

                    if (machine[0] not in current_machines_working) \
                            and (machine[1] != 0) \
                            and (process not in processes_running):
                        current_machines_working.add(machine[0])
                        processes_running[process] = machine[0]
                        processes_running_loop[process] = machine[0]
                        break

            # if process was not done in previous iteration add it to next loop
            for process in processes_running:
                if process not in processes_running_loop:
                    processes_running_loop[process] = processes_running[process]

            # taking a step
            while len(processes_running_loop.keys()) != 0:

                for process in working_setup.keys():
                    if process in processes_running_loop.keys():
                        for machine in working_setup[process]:
                            if machine[1] == 0:
                                continue
                            else:
                                if machine[0] in current_machines_working:
                                    machine[1] = machine[1] - 1
                                    processes_running_loop.pop(process)
                                    if machine[1] == 0:
                                        current_machines_working.remove(machine[0])
                                        processes_running.pop(process)

                            break

            # if process is done then add it to proper set
            for process in working_setup.keys():
                if working_setup[process][-1][1] != 0:
                    break
                else:
                    processes_done.add(process)

            current_time += 1
        self.fenotype = current_time

    def mutate(self):
        mutation_list = []
        weights = []
        mutated = []

        for i in range(self.num_mut_places + 1):
            mutation_list.append(i)

        for i in mutation_list:
            if i == 0:
                weights.append(1 - self.mut_chance)
            else:
                weights.append(self.mut_chance / (len(mutation_list) - 1))

        how_many_muts = rand.choices(mutation_list, weights=weights)[0]

        while how_many_muts != 0:
            point_mut = rand.randint(0, len(self.list_of_processes) - 1)
            second_point_mut = rand.randint(0, len(self.list_of_processes) - 1)

            if point_mut not in mutated:
                if point_mut != second_point_mut:
                    self.list_of_processes[point_mut], self.list_of_processes[second_point_mut] = \
                        self.list_of_processes[second_point_mut], self.list_of_processes[point_mut]
                    how_many_muts = how_many_muts - 1
                    mutated.extend([point_mut, second_point_mut])

        new_genotype = {}
        for process in self.list_of_processes:
            new_genotype[process] = self.genotype[process]

        self.genotype = new_genotype.copy()

In [3]:
def generate_population(init_pop: int, num_cross_places: int, num_mut_places: int, mut_chance: float):
        population = []
        adam = FactoryQueue(genotype=sorted_setup,
                            fenotype=0,
                            list_of_processes=list(sorted_setup.keys()),
                            num_cross_places=num_cross_places,
                            num_mut_places=num_mut_places,
                            mut_chance=mut_chance)
        adam.calculate_time()

        for _ in range(init_pop):
            population.append(copy.deepcopy(adam))

        return population

def pairing(population: list, cross_chance: float):
    new_population = []

    for specimen in range(0, len(population), 2):
        if rand.random() <= cross_chance:
            a = population[specimen] + population[specimen + 1]
            new_population.append(a[0])
            new_population.append(a[1])
        else:
            new_population.append(population[specimen])
            new_population.append(population[specimen + 1])

    return new_population

def natural_selection(population: list, selection_surv_percent: float, roulette: bool = False):
    if roulette:
        survivor_number = (len(population) * selection_surv_percent)
        fenotype_sum = 0
        weights = []
        new_population = []

        for specimen in population:
            fenotype_sum += specimen.fenotype

        for specimen in population:
            try:
                weights.append(specimen.fenotype / fenotype_sum)
            except:
                weights = None

        while len(new_population) != survivor_number:
            winner = np.random.choice(population, replace=False, p=weights)
            new_population.append(winner)

        while len(new_population) != len(population):
            new_population.append(rand.choices(new_population))

        np.random.shuffle(new_population)

    else:
        best = []
        survivor_number = (len(population) * selection_surv_percent)
        new_population = sorted(population, key=lambda x: x.fenotype, reverse=False)
        newer_bacteria = new_population[:int(survivor_number)]
        while len(best) != len(population):
            best.append(np.random.choice(newer_bacteria, replace=True))
        np.random.shuffle(best)

    return best

In [4]:
df = pd.read_excel('GA_task.xlsx')
one_setup = []

for i in range(int(len(df.columns) / 2)):
    one_setup.append([i])
    for j in range(len(df)):
        one_setup[i].append([df.iloc[j, (2 * i)], df.iloc[j, (2 * i + 1)]])

sorted_setup = sorted(one_setup, key=lambda x: x[1][1], reverse=True)

dictionary_of_setup = {}
for process in sorted_setup:
    key, value = process[0], process[1:]
    dictionary_of_setup[key] = value
sorted_setup = dictionary_of_setup

In [5]:
# init_pop = [10, 20, 30]
# mut_chance = [0.1, 0.2, 0.3]
# num_cross_places = [1, 5, 10]

num_mut_places = 10
cross_chance = 0.9
selection_surv_percent = 0.4
epochs = 50
init_pop = 100
mut_chance = 0.3
num_cross_places = 3

In [6]:
population = generate_population(init_pop, num_cross_places, num_mut_places, mut_chance)

for epoch in range(epochs):

    for specimen in population:
        if rand.random() <= mut_chance:
            specimen.mutate()

    population = pairing(population, cross_chance)

    for specimen in population:
        specimen.calculate_time()

    population = natural_selection(population, selection_surv_percent, False)

    suma = 0
    std = []
    for j in population:
        suma += j.fenotype
        std.append(j.fenotype)
    print(f'Run {epoch + 1}')
    print(f'Best specimen genotype: {min(population, key=lambda x: x.fenotype).list_of_processes}')
    print(f'Best specimen score: {min(population, key=lambda x: x.fenotype).fenotype}')
    print(f'Mean score: {suma / len(population)}, with standard deviation: {np.std(std)}')
    print('-------------------------------------------')

Run 1
Best specimen genotype: [9, 23, 6, 32, 18, 24, 44, 10, 31, 8, 27, 11, 3, 43, 25, 19, 47, 14, 1, 21, 5, 7, 17, 38, 34, 30, 35, 37, 12, 29, 48, 36, 2, 46, 0, 20, 45, 16, 33, 22, 13, 40, 42, 4, 28, 26, 39, 41, 15, 49]
Best specimen score: 2145
Mean score: 2178.3, with standard deviation: 7.114070564732965
-------------------------------------------
Run 2
Best specimen genotype: [9, 23, 6, 32, 18, 24, 44, 10, 31, 8, 27, 11, 3, 43, 25, 19, 47, 14, 1, 21, 5, 7, 17, 38, 34, 30, 35, 37, 12, 29, 48, 36, 2, 46, 0, 20, 45, 16, 33, 22, 13, 40, 42, 4, 28, 26, 39, 15, 41, 49]
Best specimen score: 2145
Mean score: 2177.76, with standard deviation: 7.0584984238859185
-------------------------------------------
Run 3
Best specimen genotype: [9, 23, 6, 32, 3, 24, 44, 10, 15, 8, 27, 11, 18, 0, 25, 41, 47, 14, 49, 39, 36, 42, 17, 33, 34, 30, 35, 37, 12, 29, 48, 5, 2, 46, 38, 20, 45, 31, 19, 16, 43, 1, 22, 13, 40, 7, 4, 28, 26, 21]
Best specimen score: 2134
Mean score: 2171.72, with standard deviatio