Imports

In [76]:
import pygad
import math
import random
import plotly.figure_factory as ff
from plotly.express import timeline

Helper Functions

In [77]:
def index_to_job(index):
    i = 0
    for order in orders:
        if i + 2 * len(available_tasks[order[0]]) > index:
            j = 0
            # find exact index
            for job in available_tasks[order[0]]:
                if i + 2*j == index:
                    return job
                j+=1
        else:
            i += 2 * len(available_tasks[order[0]])

def index_to_order(index):
    i = 0
    for o in range(len(orders)):
        if i + 2 * len(available_tasks[orders[o][0]]) > index:
            j = 0
            # find exact index
            for job in available_tasks[orders[o][0]]:
                if i + 2*j == index:
                    return o
                j+=1
        else:
            i += 2 * len(available_tasks[order[0]])

Environment Setup

In [78]:
last_slot = 1000
first_slot = 0
available_workstations = [2, 2, 3, 1, 2, 1, 1] # just put workstations for now (amount of workstation per type -> index = type)
available_resources = [2, 3, 1, 4] # just put workers for now (amount of worker per type -> index = type)
available_tasks = [ # recipes
    [0, 1, 2],
    [2, 3, 4],
    [4, 1, 0],
    [3, 2, 1]
]
job_durations = [ # rows = workstation idx, columns = jobs, 0 = jobs can't be done on this workstation
    [10, 20, 0, 0, 10],
    [0, 15, 10, 0, 0],
    [15, 20, 0, 10, 10],
    [10, 0, 10, 0, 10],
    [0, 0, 15, 5, 10],
    [0, 10, 0, 15, 0],
    [5, 10, 15, 10, 5]
]
workstation_resources = [ # rows = workstation idx, columns = amount of resources (workers) needed, 0 = worker type can't operate workstation
    [1, 0, 0, 1],         
    [0, 1, 1, 0],
    [1, 0, 1, 0],
    [0, 1, 0, 1],
    [1, 1, 0, 0],
    [0, 0, 1, 1],
    [2, 2, 1, 1]
]
job_resources = [ # rows = job idx, columns = amount of resources (materials needed), but ignore for now
    [0, 0, 0, 0],
    [0, 0, 0, 0],
    [0, 0, 0, 0],
    [0, 0, 0, 0],
    [0, 0, 0, 0]
]
orders = [  # <recipe, target delivery time slot>
    [0, last_slot],
    [1, last_slot],
    [2, last_slot],
    [3, last_slot],
    [1, last_slot]
]

GA Setup + Run

In [79]:
delivery_space = {'low': first_slot, 'high': last_slot}
workstation_space = {'low': 0, 'high': len(available_workstations)-1}
gene_space = []
input = []
for order in orders:
    for job in available_tasks[order[0]]:
        input.append(0)
        input.append(0)
j = 0
for i in range(len(input)): # set lower and upper bounds for mutation for each gene
    if j == 0:
        gene_space.append(workstation_space)
    else:
        gene_space.append(delivery_space)
    j+=1
    if j > 1:
        j = 0

def mutation_function(offsprings, ga_instance):
    for offspring in offsprings:
        p = 1 / (len(offspring)/2) # amount of jobs
        j = 0
        for i in range(len(offspring)):
            if j == 0:
                if random.random() < p:
                    options = []
                    job = index_to_job(i)
                    for w in range(len(job_durations)):
                        if job_durations[w][job] > 0:
                            options.append(w)
                    # mutate workstation assignment
                    offspring[i] = random.choice(options)
                    # mutate start time
                    offspring[i+1] = random.randint(0, last_slot)
            j += 1
            if j > 1:
                j = 0
                
    return offsprings

def is_feasible(solution):
    j = 0
    order = 0
    for i in range(len(solution)):
        if j == 0:
            job = index_to_job(i)
            
        if j == 1: # timeslot
            # check for last time slot
            if solution[i] + job_durations[solution[i-1]][job] > last_slot:
                return False
            # check for first slot
            if solution[i] < first_slot: # should never happen
                return False
        else: # assigned workstation
            # check for overlaps
            y = 0
            for x in range(len(solution)):
                if y == 0 and not y == i:
                    if solution[y] == solution[i]: # tasks run on the same workstation, check overlap
                        y_job = index_to_job(y)
                        i_start = solution[i+1]
                        y_start = solution[y+1]
                        i_duration = job_durations[solution[i]][job]
                        y_duration = job_durations[solution[y]][y_job]
                        i_end = i_start + i_duration
                        y_end = y_start + y_duration
                        if i_start >= y_start and i_start <= y_end:
                            return False
                        if i_end >= y_start and i_end <= y_end:
                            return False
                        if y_start >= i_start and y_start <= i_end:
                            return False
                        if y_end >= i_start and y_end <= i_end:
                            return False
                y+=1
                if y > 1:
                    y = 0
            # check for correct sequence
            prev_order = order
            order = index_to_order(i)
            if i != 0 and order == prev_order: # not the first job of the order, check previous jobs
                l = 1
                while index_to_order(i - 2*l) == order:
                    prev_start = solution[i - 2*l + 1]
                    prev_end = prev_start + job_durations[solution[i - 2*l]][index_to_job(i-2*l)]
                    start = solution[i+1]
                    if start <= prev_end:
                        return False
                    l+=1
        j+=1
        if j > 1:
            j = 0
    return True

def fitness_function(solution, solution_idx): # NOTE: PyGAD always maximizes
    fitness = 1#0
    if not is_feasible(solution):
        fitness += last_slot
    max = -float('inf')
    min = float('inf')
    j = 0
    for i in range(len(solution)):
        if j == 1: # time assignment
            start = solution[i]
            end = start + job_durations[solution[i-1]][index_to_job(i-1)]
            if end > max:
                max = solution[i]
            if start < min:
                min = solution[i]
        j+=1
        if j > 1:
            j = 0
    fitness += abs(max - min)
    return -fitness

num_genes = len(input)
num_generations = 1000
num_parents_mating = 25
sol_per_pop = 50
init_range_low = 0
init_range_high = last_slot
parent_selection_type = 'rws'
keep_parents = 1
crossover_type = 'two_points'
mutation_type = mutation_function
mutation_percentage_genes = 10 # not needed for custom mutation functions
fitness_func = fitness_function
gene_type = int

ga_instance = pygad.GA(num_generations=num_generations, num_parents_mating=num_parents_mating, fitness_func=fitness_func, sol_per_pop=sol_per_pop, num_genes=num_genes, init_range_low=init_range_low, init_range_high=init_range_high, parent_selection_type=parent_selection_type, keep_parents=keep_parents, crossover_type=crossover_type, mutation_type=mutation_type, mutation_percent_genes=mutation_percentage_genes, gene_type=gene_type, gene_space=gene_space)
ga_instance.run()
solution, solution_fitness, solution_idx = ga_instance.best_solution()
print("Parameters of the best solution : {solution}".format(solution=solution))
print("Fitness value of the best solution = {solution_fitness}".format(solution_fitness=solution_fitness))


Parameters of the best solution : [  2 137   5 759   3 789   6 467   4 539   4 563   0 319   6 376   6 804
   5 147   1 458   5 672   1 200   5 475   6 722]
Fitness value of the best solution = -668


Visualize Result

In [80]:
def get_colors(n): 
    ret = [] 
    r = int(random.random() * 256) 
    g = int(random.random() * 256) 
    b = int(random.random() * 256) 
    step = 256 / n 
    for i in range(n): 
        r += step 
        g += step 
        b += step 
        r = int(r) % 256 
        g = int(g) % 256 
        b = int(b) % 256 
        ret.append((r,g,b))  
    return ret

def visualize(solution, solution_fitness, orders, durations):
    colors = {}
    rgb_values = get_colors(len(orders))
    for i in range(len(orders)):
        colors[str(orders[i])] = f'rgb({rgb_values[i][0]}, {rgb_values[i][1]}, {rgb_values[i][2]})'
    data = []
    j = 0
    for i in range(len(solution)):
        if j == 0:
            job = index_to_job(i)
            order = index_to_order(i)
            label = f'W{solution[i]}'
            start = solution[i+1]
            end = start + durations[solution[i]][job]
            data.append(
                        dict(Task=label, Start=start, Finish=end, Resource=f'{orders[order]}')
                    )
        j+=1
        if j > 1:
            j = 0

    fig = ff.create_gantt(data, colors=colors, index_col='Resource', show_colorbar=True,
                        group_tasks=True, title=f'Orders made: {orders}, Fitness of best result: {solution_fitness}', showgrid_x=True)
    #fig = timeline(data, x_start="Start", x_end="Finish", y="Task",color = "Resource",template = "gridon")
    fig.show()

visualize(solution, solution_fitness, orders, job_durations)