In [None]:
import numpy as np
import pandas as pd
from tqdm import tqdm
import random

In [None]:
# 讀檔並將其分別存進 mOrder (每個job的機器加工順序)、 pTime (對應的加工時間)
pTime = []
mOrder = []
file_name = 'rcmax_20_15_10.txt' # 指定問題
with open(file_name, 'r') as file:
    lines = file.readlines()
    NUM_JOB = int(lines[0].strip().split()[0]) # 工件數
    NUM_MACHINE = int(lines[0].strip().split()[1]) # 機器數
    for line in lines[1:]:
        numbers = line.strip().split()
        if len(numbers) % 2 != 0:
            continue
        p = []
        m = []
        for i in range(0, len(numbers), 2):
            x = int(numbers[i])
            y = int(numbers[i+1])
            m.append(x)
            p.append(y)
        mOrder.append(m)
        pTime.append(p)

In [None]:
# 設定初始解
def create_pop_new(mOrder, num_j, num_mc):
    np.random.seed(0)
    flag = True # 判斷該解是否可行
    while flag:
        mOrder = np.array(mOrder)
        rows, cols = mOrder.shape
        new_col = np.full((rows, 1), fill_value=-1, dtype=mOrder.dtype)
        mOrder = np.hstack((mOrder, new_col)) # 每個job的處理機台順序都以-1結尾，用來表示該job已全部做完

        sol = np.full((num_mc, num_j), fill_value = -1) # 存最終解
        job_seq = np.array([0 for i in range(num_j)]) # 為index，用來指定每個job現在應該去的機台(cw)
        sol_seq = np.array([0 for i in range(num_mc)]) # 為index，用來指定在最終解(sol)中的放置位置
        while sum(job_seq)<num_mc*num_j-1: #如果解還沒全部填完
            for machine in range(num_mc): # 持續遍歷每個機台以找尋目前可以加工的工件
                cw = mOrder[np.arange(mOrder.shape[0]), job_seq] # 現在的等候機台
                if np.size(np.where((cw == machine) & (job_seq != num_mc))[0])!=0: # 判斷cw中是否存在當前匹配機台且該機台還有工件尚未處理
                    j = random.choice(np.where(cw == machine)[0]) # 在符合的index中隨機挑選一個，index即為job ID
                    sol[machine][sol_seq[machine]] = j # 將其存進解的對應位置
                    job_seq[j] += 1 # 該job的機台順序+1，表示可以去下一個機台了
                    sol_seq[machine] += 1 # 該機台的解位置+1，表示可以放下一個要做的job了
        if -1 in sol:
            flag = True
        else:
            flag = False
    return sol.flatten().tolist() # 將維度 num_mc x num_j 的解壓平成維度 1 x (num_mc*num_j)

In [None]:
# 基因演算法
class GeneticAlgorithm(object):
    def __init__(self, NUM_JOB, NUM_MACHINE, pTime, mOrder, Pc, Pm, NUM_ITERATION=1000, optim=True):
        # 參數設定
        self.NUM_ITERATION = NUM_ITERATION # 世代數(迴圈數)
        self.NUM_JOB = NUM_JOB # 工件數
        self.NUM_MACHINE = NUM_MACHINE # 機台數
        self.pTime = pTime # 每個job在每個機台的加工時間
        self.mOrder = mOrder # 每個job的機器加工順序
        
        self.NUM_CHROME = 40  # 染色體個數
        self.NUM_BIT = self.NUM_JOB * self.NUM_MACHINE  # 染色體長度

        self.Pc = Pc # 交配率 (代表共執行Pc*NUM_CHROME/2次交配)
        self.Pm = Pm # 突變率 (代表共要執行Pm*NUM_CHROME*NUM_BIT次突變)

        self.NUM_PARENT = self.NUM_CHROME # 父母的個數
        self.NUM_CROSSOVER = int(self.Pc * self.NUM_CHROME / 2) # 交配的次數
        self.NUM_CROSSOVER_2 = self.NUM_CROSSOVER*2 # 上數的兩倍
        self.NUM_MUTATION = int(self.Pm * self.NUM_CHROME * self.NUM_BIT) # 突變的次數
        
        self.optim = optim

        np.random.seed(0)
        
    # 初始化群體
    def initPop(self):
        p = []
        for i in range(self.NUM_CHROME) :        
            a = create_pop_new(mOrder, NUM_JOB, NUM_MACHINE)
            p.append(a)
        return p

    def fitFunc(self, x): # 適應度函數
        S = np.zeros((self.NUM_JOB, self.NUM_MACHINE))    # S[i][j] = job i 在 machine j 的開始時間
        C = np.zeros((self.NUM_JOB, self.NUM_MACHINE))    # C[i][j] = job i 在 machine j 的完成時間
        B = np.zeros(self.NUM_MACHINE, dtype=int)    # B[j] = machine j目前可開始的時間
        opJob = np.zeros(self.NUM_JOB, dtype=int)    # opJob[i] = job i 現在的加工順序

        for i in range(self.NUM_BIT): # 跑過現在該做的job
            m = mOrder[x[i]][opJob[x[i]]] # 找到 job i 對應的 machine m
            # 設置開始時間
            if opJob[x[i]] != 0: # 不是該 machine 的第一個工作
                # 將開始時間設置為(該 machine 目前可開始時間)和(job i 在上一個 machine 的完成時間)的最大值
                S[x[i]][m] = max([B[m], C[x[i]][mOrder[x[i]][opJob[x[i]]-1]]])
            else: # 是該 machine 的第一個工作
                S[x[i]][m] = B[m] # 將開始時間設置為 0

            C[x[i]][m] = B[m] = S[x[i]][m] + pTime[x[i]][opJob[x[i]]] # 將完成時間和可開始時間更新為開始時間+加工時間
            opJob[x[i]] += 1     
        return -max(B) # 因為是最小化問題

    def evaluatePop(self, p): # 評估群體之適應度
        return [self.fitFunc(p[i]) for i in range(len(p))]

    def selection(self, p, p_fit): # 用二元競爭式選擇法來挑父母
        a = []
        for i in range(self.NUM_PARENT):
            [j, k] = np.random.choice(self.NUM_CHROME, 2, replace=False)  # 任選兩個index
            if p_fit[j] > p_fit[k]: # 擇優
                a.append(p[j].copy())
            else:
                a.append(p[k].copy())
        return a

    def crossover_one_point(self, p): # 用單點交配來繁衍子代
        a = []
        for i in range(self.NUM_CROSSOVER) :
            c = np.random.randint(1, self.NUM_BIT) # 隨機找出單點(不包含0)
            [j, k] = np.random.choice(self.NUM_PARENT, 2, replace=False) # 任選兩個index
            child1, child2 = p[j].copy(), p[k].copy()
            remain1, remain2 = list(p[j].copy()), list(p[k].copy())
            for m in range(self.NUM_BIT):
                if m < c :
                    remain2.remove(child1[m]) # 砍掉 remain2 中的值是 child1[m]
                    remain1.remove(child2[m]) # 砍掉 remain1 中的值是 child2[m]
            t = 0
            for m in range(self.NUM_BIT):
                if m >= c :
                    child1[m] = remain2[t]
                    child2[m] = remain1[t]
                    t += 1
            a.append(child1)
            a.append(child2)
        return a
    
    def mutation(self, p): # 突變
        for _ in range(self.NUM_MUTATION) :
            row = np.random.randint(self.NUM_CROSSOVER_2) # 任選一個染色體
            [j, k] = np.random.choice(self.NUM_BIT, 2, replace=False) # 任選兩個基因
            p[row][j], p[row][k] = p[row][k], p[row][j] # 此染色體的兩基因互換

    def sortChrome(self, a, a_fit): # a的根據a_fit由大排到小
        a_index = range(len(a)) # 產生 0, 1, 2, ..., |a|-1 的 list
        a_fit, a_index = zip(*sorted(zip(a_fit,a_index), reverse=True)) # a_index 根據 a_fit 的大小由大到小連動的排序
        return [a[i] for i in a_index], a_fit # 根據 a_index 的次序來回傳 a，並把對應的 fit 回傳

    def replace(self, p, p_fit, a, a_fit): # 適者生存
        b = np.concatenate((p,a), axis=0) # 把本代 p 和子代 a 合併成 b
        b_fit = p_fit + a_fit # 把上述兩代的 fitness 合併成 b_fit
        b, b_fit = self.sortChrome(b, b_fit) # b 和 b_fit 連動的排序
        return b[:self.NUM_CHROME], list(b_fit[:self.NUM_CHROME]) # 回傳 NUM_CHROME 個為新的一個世代

    def run(self):
        # ==== 主程式 ====
        pop = self.initPop() # 初始化 pop
        pop_fit = self.evaluatePop(pop)  # 算 pop 的 fit

        best_outputs = [] # 用此變數來紀錄每一個迴圈的最佳解 (new)
        best_outputs.append(np.max(pop_fit)) # 存下初始群體的最佳解

        mean_outputs = [] # 用此變數來紀錄每一個迴圈的平均解 (new)
        mean_outputs.append(np.average(pop_fit)) # 存下初始群體的最佳解

        best = {'sol':[], 'makespan':9999}
        for i in tqdm(range(self.NUM_ITERATION)):
            parent = self.selection(pop, pop_fit) # 挑父母
            offspring = self.crossover_one_point(parent) # 單點交配
            self.mutation(offspring) # 突變
            offspring_fit = self.evaluatePop(offspring) # 算子代的 fit
            pop, pop_fit = self.replace(pop, pop_fit, offspring, offspring_fit) # 取代

            best_outputs.append(np.max(pop_fit)) # 存下這次的最佳解
            mean_outputs.append(np.average(pop_fit)) # 存下這次的平均解
            
            if -pop_fit[0]<best['makespan']:
                best['makespan'] = -pop_fit[0]
                best['sol'] = pop[0]
        if self.optim:
            return -pop_fit[0]
        else:
            return best

In [None]:
# 優化基因演算法參數
import optunity
import optunity.metrics

def objective_function(crossover_rate, mutation_rate):
    jsga = GeneticAlgorithm(NUM_JOB, NUM_MACHINE, pTime, mOrder, crossover_rate, mutation_rate)
    performance = jsga.run()
    return performance

param_space = {'crossover_rate': [0.9, 1.0], 'mutation_rate': [0.0, 0.001]} # 設定要優化參數的範圍

optimal_params, optimal_value, _ = optunity.minimize(objective_function, num_evals=100, solver_name='particle swarm', **param_space)

print("Optimal parameters:", optimal_params)
print("Optimal fitness value:", optimal_value)

In [None]:
# 用優化好的交配率和突變率代入基因演算法求最佳解
Pc = optimal_params['crossover_rate']
Pm = optimal_params['mutation_rate']
jsga = GeneticAlgorithm(NUM_JOB, NUM_MACHINE, pTime, mOrder, Pc, Pm, NUM_ITERATION=3000, optim=False)
best = jsga.run()
print(best)