# Simulated Annealing as a Meta heuristic for the FSS problem

In [1]:
import pandas as pd
import numpy as np
from itertools import permutations
import matplotlib.pyplot as plt
import matplotlib.patches as patches

import plotly.express as px
import plotly.figure_factory as ff

In [33]:
class SolveFSSP:
    def __init__(
        self,
        n: int,
        m: int,
        initial_temperature: float,
        final_temperature: float,
        cooling_rate: float,
        max_iterations: int,
    ) -> None:
        self.n = n  # number of jobs
        self.m = m  # number of machines/operations
        self.load_dataset("operation_times.csv")

        self.initial_temperature = initial_temperature
        self.final_temperature = final_temperature
        self.cooling_rate = cooling_rate
        self.max_iterations = max_iterations

        self.S = np.zeros((m, n))  # Job start matrix
        self.C = np.zeros((m, n))  # Job completion matrix
        self.solution = self.initial_solution()

    def load_dataset(self, csv_path):
        p = pd.read_csv(csv_path)
        self.r_hat = np.array(p["r_a"])
        self.d_hat = np.array(p["d_a"])
        self.p = np.array(p.drop(["r_a", "d_a"], axis=1))
        print("p:", self.p)

    def initial_solution(self):
        """inializes the solution (permutations for each machine) array

        Returns:
            list[list[int]]: matrix of permuations for each machine
        """
        return [[i for i in range(self.n)] for _ in range(self.m)]

    def calculate_times(self, solution):
        """Calculates the start and completion times matrices S and C.

        Args:
            solution (list[list[int]]): matrix of permutations for each machine

        Returns:
            tuple: (S, C) matrices of start and completion times
        """
        S = np.zeros((self.m, self.n))
        C = np.zeros((self.m, self.n))

        for i in range(self.m):  # for each machine
            for j in range(self.n):  # for each job
                job = solution[i][j]
                print(f"dealing with operation {i} of job {job}")

                if i == 0:  # first machine
                    if j == 0:
                        S[i][job] = 0
                    else:
                        S[i][job] = C[i][solution[i][j - 1]] + self.r_hat[i]
                        print("S[i][job]: ", S[i][job])
                        # print(i, solution[i][j - 1], C[i][job - 1])
                        # print(f"S[i][job]={S[i][job]}")

                else:  # subsequent machines
                    if j == 0:
                        print(i, solution[i][j - 1], C[i - 1][job])
                        # the first job starts as soon as the the previous operation
                        # of the same job is done
                        S[i][job] = C[i - 1][job]
                    else:
                        # S[i][job] = max(C[i - 1][job], C[i][job - 1] + self.r_hat[i])
                        S[i][job] = max(
                            C[i - 1][job], C[i][solution[i][j - 1]] + self.r_hat[i]
                        )
                        print(f"completion time of previous operation: {C[i - 1][job]}")
                        print(
                            f"completion time of curren/last task on machine: {C[i][solution[i][j-1]]}"
                        )
                        print(f"S[i][job]={S[i][job]}")
                # Fill C matrix

                C[i][job] = S[i][job] + self.p[i, job]
                print("S[i][job]: ", S[i][job])
                print("self.p[i, job]", self.p[i, job])
                print("C[i][job]: ", C[i][job])
                print("S:\n", S)
                print("C:\n", C, "\n")

                # Ensure idle time constraints hold
                if j > 0:
                    # idle_time = S[i][job] - C[i][job - 1]
                    idle_time = S[i][job] - C[i][solution[i][j - 1]]
                    if idle_time < self.r_hat[i] or idle_time > self.d_hat[i]:
                        print(f"idle_time={idle_time} violated, adjusting ")
                        self.adjust_row(solution, S, C, i, j, job)
                        #         # Adjust start time to respect idle time constraints
                        #         S[i][job] = C[i][job - 1] + self.r_hat[i]
                        #         C[i][job] = S[i][job] + self.p[i, job]
                        print("updated S:\n", S)
                        print("updated C:\n", C)

        return S, C

    def adjust_row(self, solution, S, C, i, j, job):
        idle_time = S[i][job] - C[i][solution[i][j - 1]]
        if j == 0 or idle_time <= self.d_hat[i]:
            return
        # move previous job to finish precisely d_hat moments before
        C[i][solution[i][j - 1]] = S[i][job] - self.d_hat[i]
        S[i][solution[i][j - 1]] = C[i][solution[i][j - 1]] - self.p[i][j]

        # recursively adjust previous jobs
        self.adjust_row(solution, S, C, i, j - 1, solution[i][j - 1])

    def objective_function(self, solution):
        _, C = self.calculate_times(solution)
        return max(C[-1])  # Last machine's last job completion time

    def plot_gantt(self, S, C, solution):
        data = []
        for machine in range(self.m):
            for job_index in range(self.n):
                job = solution[machine][job_index]
                start_time = S[machine, job_index]
                completion_time = C[machine, job_index]
                duration = completion_time - start_time
                data.append(
                    [machine + 1, job + 1, start_time, completion_time, duration]
                )

        df = pd.DataFrame(data, columns=["Task", "Job", "Start", "Finish", "Duration"])

        fig = ff.create_gantt(df, colors="Viridis", show_colorbar=True)
        fig.show()

In [32]:
fssp = SolveFSSP(
    n=5,
    m=3,
    initial_temperature=1000,
    final_temperature=1,
    cooling_rate=0.95,
    max_iterations=10000,
)
initial_solution = fssp.initial_solution()
# S, C = fssp.calculate_times(initial_solution)
S, C = fssp.calculate_times(
    # np.array([[0, 1, 4, 2, 3], [0, 1, 4, 2, 3], [0, 1, 4, 2, 3]])
    np.array([[0, 1, 4, 2, 3], [0, 1, 4, 2, 3], [0, 1, 4, 2, 3]])
)

print("p:\n", fssp.p, "\n")
print("r:\n", fssp.r_hat, "\n")


print("d:\n", fssp.d_hat, "\n\n")
print(S, "\n")
print(C)

p: [[2 1 2 1 3]
 [1 2 1 2 1]
 [2 2 3 2 1]]
dealing with operation 0 of job 0
S[i][job]:  0.0
self.p[i, job] 2
C[i][job]:  2.0
S:
 [[0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]]
C:
 [[2. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]] 

dealing with operation 0 of job 1
S[i][job]:  3.0
S[i][job]:  3.0
self.p[i, job] 1
C[i][job]:  4.0
S:
 [[0. 3. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]]
C:
 [[2. 4. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]] 

dealing with operation 0 of job 4
S[i][job]:  5.0
S[i][job]:  5.0
self.p[i, job] 3
C[i][job]:  8.0
S:
 [[0. 3. 0. 0. 5.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]]
C:
 [[2. 4. 0. 0. 8.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]] 

dealing with operation 0 of job 2
S[i][job]:  9.0
S[i][job]:  9.0
self.p[i, job] 2
C[i][job]:  11.0
S:
 [[0. 3. 9. 0. 5.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]]
C:
 [[ 2.  4. 11.  0.  8.]
 [ 0.  0.  0.  0.  0.]
 [ 0.  0.  0.  0.  0.]] 

dealing with operation 0 of job 3
S[i][job]:  12.0
S[i][job]:  12.0
self.p[i, job