# Development | Implement the different parts and validations of GA without the SAC condition

In [1]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

%matplotlib widget

In [2]:
import os
import logging
import numpy as np
import matplotlib.pyplot as plt
import jupyter_black

jupyter_black.load()

# Project
from src.data_connectors import read_input_files

In [3]:
instance = 333
instances_path = "../data/input/HRTInstances"
ins_x = read_input_files.read_file(os.path.join(instances_path, f"Instance_{instance}.txt"))

In [4]:
ins_x.df_setup

Unnamed: 0,Humans,Robots,WorkingSpaces
0,2,3,3


In [5]:
ins_x.df_resources

Unnamed: 0,Type,Id
0,H,0
1,H,1
2,R,2
3,R,3
4,R,4


In [6]:
ins_x.df_workingspace_resources

Unnamed: 0,WorkingSpace,Resource
0,1,2
1,1,0
2,1,1
3,2,3
4,2,0
5,2,1
6,3,4
7,3,0
8,3,1


In [7]:
ins_x.df_workingspace_id.head()

Unnamed: 0,WorkingSpace,Id
0,1,1
1,1,2
2,1,3
3,1,4
4,1,5


## 0. The Chromosome

In [8]:
from src.genetic_algorithm.chromosome import Chromosome

## 1. Generate initial population

In [None]:
from src.genetic_algorithm import first_population

# Modes
possible_modes = first_population.get_possible_modes(ins_x)

# Tasks
n_tasks = first_population.get_total_number_of_tasks_per_working_space(ins_x)

# Chromosomes
chromosomes = first_population.get_first_population(ins_x, possible_modes, n_tasks)

## 2. Verify feasibility of solutions

In [None]:
from src.genetic_algorithm import feasibility

In [None]:
ins_x.df_predecessor_sucessor.head()

In [None]:
precedence_feasible_chromosomes = []
for i in range(len(chromosomes)):
    chrom = chromosomes[i]
    if feasibility.is_chromosome_precedence_feasible(ins_x, chrom):
        precedence_feasible_chromosomes.append(chrom)

print(f"From {len(chromosomes)} to {len(precedence_feasible_chromosomes)}")

In [None]:
task_mode_feasible_chromosomes = []
for i in range(len(precedence_feasible_chromosomes)):
    chrom = precedence_feasible_chromosomes[i]
    if feasibility.is_chromosome_task_mode_feasible(ins_x, chrom):
        task_mode_feasible_chromosomes.append(chrom)

print(f"From {len(precedence_feasible_chromosomes)} to {len(task_mode_feasible_chromosomes)}")

## 3. Duplication of the remaining workspaces

In [None]:
from src.genetic_algorithm import replication

In [None]:
# Generate new orders
chromosome = task_mode_feasible_chromosomes[0]
chromosome

In [None]:
chromosome_with_replication = replication.update_chromosome_with_replication(ins_x, chromosome)

In [None]:
chromosome_with_replication

In [None]:
# for all feasible chromosomes
replicated_chromosomes = []
for chromosome in task_mode_feasible_chromosomes:
    replicated_chromosomes.append(replication.update_chromosome_with_replication(ins_x, chromosome))

len(replicated_chromosomes)

## 4. Check feasibility of all workspaces independently

In [None]:
precedence_feasible_chromosomes = []
for i in range(len(replicated_chromosomes)):
    chrom = replicated_chromosomes[i]
    if feasibility.is_chromosome_precedence_feasible(ins_x, chrom):
        precedence_feasible_chromosomes.append(chrom)

print(f"From {len(replicated_chromosomes)} to {len(precedence_feasible_chromosomes)}")

In [None]:
task_mode_feasible_chromosomes = []
for i in range(len(precedence_feasible_chromosomes)):
    chrom = precedence_feasible_chromosomes[i]
    if feasibility.is_chromosome_task_mode_feasible(ins_x, chrom):
        task_mode_feasible_chromosomes.append(chrom)

print(f"From {len(precedence_feasible_chromosomes)} to {len(task_mode_feasible_chromosomes)}")

## 5. Generate next population

In [None]:
from src.genetic_algorithm import time_allocation
from src.genetic_algorithm import fitness
from src.genetic_algorithm import next_population

In [None]:
# for all replicated chromosomes
time_allocated_chromosomes = []
for chromosome in replicated_chromosomes:
    time_allocated_chromosomes.append(time_allocation.get_all_time_allocations(ins_x, chromosome))

print(len(time_allocated_chromosomes))

# for all results
makespan_all_chromosomes = []
for chromosome_time_allocation in time_allocated_chromosomes:
    makespan_all_chromosomes.append(time_allocation.find_makespan(chromosome_time_allocation))

print(len(makespan_all_chromosomes))

fittest_chromosomes, fittest_makespan = fitness.keep_fittest_n_chromosomes(
    task_mode_feasible_chromosomes, makespan_all_chromosomes, 100
)

print(len(fittest_chromosomes))

In [None]:
fittest_chromosomes[0]

In [None]:
new_generation = next_population.generate_next_population_with_crossover(
    fittest_chromosomes, fittest_makespan
)

In [None]:
new_generation[0]

In [None]:
import copy

logging.debug(f"generate_next_population() - 0 {len(fittest_chromosomes)}")
new_chromosomes = copy.deepcopy(fittest_chromosomes)

In [None]:
logging.debug(f"generate_next_population() - 1 {len(new_chromosomes)}")
original_fittest_chromosomes = copy.deepcopy(fittest_chromosomes)
new_generation = next_population.generate_next_population_with_crossover(
    original_fittest_chromosomes, fittest_makespan
)
generation_to_mutate = copy.deepcopy(new_generation)
new_chromosomes.extend(new_generation)
logging.debug(f"generate_next_population() - 2 {len(new_chromosomes)}")

In [None]:
probability = 0.8

new_mutated_population = []
for chromosome in generation_to_mutate:
    mutated_chromosome = copy.deepcopy(chromosome)
    mutated_chromosome = next_population.swap_mutation_at_probability(
        mutated_chromosome, probability
    )
    new_mutated_population.append(mutated_chromosome)
new_chromosomes.extend(new_mutated_population)
logging.debug(f"generate_next_population() - 3 {len(new_chromosomes)}")

In [None]:
new_chromosomes = next_population.remove_duplicated_chromosomes(copy.deepcopy(new_chromosomes))
logging.debug(f"generate_next_population() - 4 {len(new_chromosomes)}")

In [None]:
new_chromosomes[0]

### Check feasibility of complete new generation


In [None]:
precedence_feasible_chromosomes = []
for i in range(len(new_chromosomes)):
    chrom = new_chromosomes[i]
    if feasibility.is_chromosome_precedence_feasible(ins_x, chrom):
        precedence_feasible_chromosomes.append(chrom)

print(f"From {len(new_chromosomes)} to {len(precedence_feasible_chromosomes)}")

In [None]:
task_mode_feasible_chromosomes = []
for i in range(len(precedence_feasible_chromosomes)):
    chrom = precedence_feasible_chromosomes[i]
    if feasibility.is_chromosome_task_mode_feasible(ins_x, chrom):
        task_mode_feasible_chromosomes.append(chrom)

print(f"From {len(precedence_feasible_chromosomes)} to {len(task_mode_feasible_chromosomes)}")

# Complete GA

In [9]:
from src.genetic_algorithm.genetic_algorithm import *

In [47]:
instance_number = 217
instances_path = "../data/input/HRTInstances"
instance = read_input_files.read_file(
    os.path.join(instances_path, f"Instance_{instance_number}.txt")
)

In [48]:
max_limit_time_sec = 3


population = generate_first_population(instance)
logging.info(f"Size first population: {len(population)}")
previous_population = None
is_better_than_previous = True
start_time = datetime.datetime.now()
iteration_time = 0
probability = 0.9
iteration = 0
# for latency assessment
min_makespan = float("inf")
better_times_seconds = max_limit_time_sec
feasible_population = keep_feasible_chromosomes(instance, population)
logging.info(f"Size of feasible population: {len(feasible_population)}")

In [66]:
import itertools
import logging

from src.data_connectors.read_input_files import Instance
from src.genetic_algorithm.chromosome import Chromosome


def flatten(lst: list) -> list:
    return list(itertools.chain(*lst))


def is_int(string: str) -> bool:
    try:
        int(string)
        return True
    except ValueError:
        return False


def split_collaboration_into_ints(collab_str: str) -> tuple[int, int]:
    val_1, val_2 = collab_str.split("-")
    return int(val_1), int(val_2)


def generate_modes_of_working_space(
    instance: Instance,
    initial_working_space: int,
    working_space: int,
    modes: list[str],
) -> list[str]:
    initial_ws_resources = instance.df_workingspace_resources[
        instance.df_workingspace_resources.WorkingSpace == initial_working_space
    ].Resource.unique()
    ws_resources = instance.df_workingspace_resources[
        instance.df_workingspace_resources.WorkingSpace == working_space
    ].Resource.unique()
    resources_exclusively_in_new_ws = [m for m in ws_resources if m not in initial_ws_resources]

    new_ws_mode = []
    for m in modes:
        if is_int(m):
            if int(m) in ws_resources:
                new_ws_mode.append(m)
            elif len(resources_exclusively_in_new_ws) == 1:
                new_ws_mode.append(str(resources_exclusively_in_new_ws[0]))
            elif len(resources_exclusively_in_new_ws) > 1:
                logging.warning("WHAT TO DO?")
        else:
            m1, m2 = split_collaboration_into_ints(m)
            if (m1 in ws_resources) and (m2 in ws_resources):
                new_ws_mode.append(m)
            elif (m1 in ws_resources) and (m2 not in ws_resources):
                if len(resources_exclusively_in_new_ws) == 1:
                    updated_m = str(m1) + "-" + str(resources_exclusively_in_new_ws[0])
                    new_ws_mode.append(updated_m)
            elif (m2 in ws_resources) and (m1 not in ws_resources):
                if len(resources_exclusively_in_new_ws) == 1:
                    updated_m = str(resources_exclusively_in_new_ws[0]) + "-" + str(m2)
                    new_ws_mode.append(updated_m)
    return new_ws_mode


def generate_new_mode_with_replication(instance: Instance, chromosome: Chromosome) -> list[str]:
    # Generate modes for remaining working spaces
    all_new_modes = []
    logging.debug(
        f"generate_new_mode_with_replication(): Non replicated chromosome: {len(chromosome.mode)}"
    )
    for ws in instance.df_workingspace_resources.WorkingSpace.unique():
        if ws == 1:
            all_new_modes.append(chromosome.mode)
        else:
            mode_for_ws = generate_modes_of_working_space(instance, 1, ws, chromosome.mode)
            all_new_modes.append(mode_for_ws)
    logging.debug(
        f"generate_new_mode_with_replication(): Replicated crhomosome: {len(flatten(all_new_modes))}"
    )
    return flatten(all_new_modes)


def generate_new_orders_with_replication(instance: Instance, chromosome: Chromosome) -> list[int]:
    # Generate orders for remaining working spaces, knowing that they have
    # to follow a replication SAC method
    new_orders = []
    working_spaces = instance.df_workingspace_resources.WorkingSpace.unique()
    for i in range(len(working_spaces)):
        ws_order = [(x * len(working_spaces)) + i for x in chromosome.order]
        new_orders.append(ws_order)
    return flatten(new_orders)

In [67]:
replicated_chromosomes = []
logging.debug(
    f"add_replication_of_remaining_working_spaces(): Non replicated chromosomes: {len(feasible_population)}"
)
for chromosome in feasible_population:
    replicated_chromosomes.append(
        Chromosome(
            mode=generate_new_mode_with_replication(instance, chromosome),
            order=generate_new_orders_with_replication(instance, chromosome),
        )
    )

In [68]:
feasible_population[0]

Chromosome(mode=['0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0'], order=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19])

In [69]:
replicated_chromosomes[0]

Chromosome(mode=['0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0'], order=[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39])

In [70]:
np.sort(replicated_chromosomes[0].order)

array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
       17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
       34, 35, 36, 37, 38, 39])

In [43]:
chromosome = replicated_population[0]

tasks = [x for x in range(1, len(chromosome.order) + 1)]

allocated_prior_tasks = []
for n in range(len(chromosome.order)):
    print(chromosome.order.index(n))
    sucessor = tasks[chromosome.order.index(n)]
    predecessors = instance.df_predecessor_sucessor[
        instance.df_predecessor_sucessor["Sucessor"] == sucessor
    ].Predecessor.unique()
    if len([x for x in predecessors if x not in allocated_prior_tasks]) > 0:
        print(False)
    allocated_prior_tasks.append(sucessor)

0
20


ValueError: 2 is not in list

In [40]:
precedence_feasible_chromosomes = []
for i in range(len(replicated_population)):
    chrom = replicated_population[i]
    if feasibility.is_chromosome_precedence_feasible(instance, chrom):
        precedence_feasible_chromosomes.append(chrom)
logging.debug(f"From {len(replicated_population)} to {len(precedence_feasible_chromosomes)}")

ValueError: 2 is not in list

In [None]:
task_mode_feasible_chromosomes = []
for i in range(len(precedence_feasible_chromosomes)):
    chrom = precedence_feasible_chromosomes[i]
    if feasibility.is_chromosome_task_mode_feasible(instance, chrom):
        task_mode_feasible_chromosomes.append(chrom)
logging.debug(
    f"From {len(precedence_feasible_chromosomes)} to {len(task_mode_feasible_chromosomes)}"
)

In [38]:
len(feasible_population)

12

In [18]:
times_of_populations = add_times_and_find_makespan(instance, feasible_population)

In [19]:
times_of_populations[0]

2996

In [20]:
(
    fittest_population,
    fittest_makespan,
    _,
) = keep_fittest_chromosomes(replicated_population, times_of_populations, feasible_population)

In [21]:
fittest_population[0]

Chromosome(mode=['2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4'], order=[0, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57, 1, 4, 7, 10, 13, 16, 19, 22, 25, 28, 31, 34, 37, 40, 43, 46, 49, 52, 55, 58, 2, 5, 8, 11, 14, 17, 20, 23, 26, 29, 32, 35, 38, 41, 44, 47, 50, 53, 56, 59])

In [22]:
fittest_makespan[0]

1212

In [28]:
replicated_population = generate_next_population(fittest_population, fittest_makespan, probability)

In [29]:
replicated_population[0]

Chromosome(mode=['2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4'], order=[0, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57, 1, 4, 7, 10, 13, 16, 19, 22, 25, 28, 31, 34, 37, 40, 43, 46, 49, 52, 55, 58, 2, 5, 8, 11, 14, 17, 20, 23, 26, 29, 32, 35, 38, 41, 44, 47, 50, 53, 56, 59])

In [30]:
replicated_population[-1]

Chromosome(mode=['2', '1', '1', '0-2', '1', '0', '0-3', '0-2', '0-2', '1-2', '0-2', '0-2', '0-2', '1-2', '0', '2', '0', '3', '1', '1', '1-3', '1', '0', '0-3', '3', '0-4', '1-3', '1', '0-3', '2', '1', '0', '4', '0', '0', '3', '1', '0', '1-4', '1', '4', '4', '4', '2', '4', '0', '1-4', '1', '1-3', '1-4', '0-4', '0', '4', '1', '0', '4', '1', '0', '1', '1'], order=[0, 12, 6, 9, 58, 15, 45, 21, 24, 27, 30, 33, 36, 7, 42, 18, 48, 51, 44, 57, 1, 4, 39, 10, 13, 16, 19, 22, 25, 28, 26, 34, 37, 52, 43, 46, 49, 40, 55, 56, 2, 5, 8, 11, 3, 53, 20, 23, 31, 29, 32, 35, 38, 41, 54, 47, 50, 17, 14, 59])

In [31]:
feasible_population = keep_feasible_chromosomes(instance, replicated_population)

In [32]:
len(feasible_population)

285

In [None]:
while is_better_than_previous and (iteration_time < max_limit_time_sec):
    feasible_population = keep_feasible_chromosomes(instance, replicated_population)
    logging.info(f"Size of feasible population: {len(feasible_population)}")
    if len(feasible_population) < 2:
        logging.warning(f"Population size: {len(feasible_population)}! Everything died!")
        logging.warning(f"Returning results.")

    times_of_populations = add_times_and_find_makespan(instance, feasible_population)
    (
        fittest_population,
        fittest_makespan,
        _,
    ) = keep_fittest_chromosomes(replicated_population, times_of_populations, feasible_population)
    logging.info(f"Fittest replicated: {len(fittest_population)}")
    is_better_than_previous = is_new_population_better_than_previous(
        fittest_population, previous_population
    )
    if is_better_than_previous:
        population = generate_next_population(fittest_population, fittest_makespan, probability)
        logging.info(f"Size new population: {len(population)}")
    if np.min(fittest_makespan) < min_makespan:
        min_makespan = np.min(fittest_makespan)
        better_times_seconds = (datetime.datetime.now() - start_time).total_seconds()
    end_time = datetime.datetime.now()
    iteration_time = (end_time - start_time).total_seconds()
    probability = exponential_mutation_probability(iteration)
    iteration += 1
    logging.info(
        f"==> Iteration: {iteration} Time: {iteration_time:.2f} seconds. Fittest solution: {np.min(fittest_makespan)}"
    )
    logging.info(f"Next iteration will have: probability = {probability:.4f}")