In [2]:
import pandas as pd
import numpy as np
import random as rng

# Settings

In [3]:
data_filepath = "PaintShop - September 2024.xlsx"
data_sheet_names_by_table_name = {
    "orders": "Orders", 
    "machines": "Machines", 
    "setups": "Setups"
}
rng.seed(420)

# Data

In [4]:
data = {table_name: pd.read_excel(data_filepath, sheet_name) for table_name, sheet_name in data_sheet_names_by_table_name.items()}

# We keep the soource data in a dictionary in order to prevent confusion about what tables are source and what are derived.
orders = pd.read_excel(data_filepath, sheet_name = "Orders")
machine_speeds = pd.read_excel(data_filepath, sheet_name = "Machines")
setups = pd.read_excel(data_filepath, sheet_name = "Setups")

In [None]:
data["orders"]

In [None]:
data["machines"].head()

In [None]:
data["setups"]

### Prepare data

In [None]:
machine_speeds = {
    id: speed 
    for id, speed 
    in zip(data["machines"].index, data["machines"]["Speed"])
}
machine_speeds

In [9]:
machine_ids = data["machines"].index.values

In [None]:
# Encode color names for efficiency, set ID as index
unique_colors = data["setups"]["From colour"].unique()
color_names_by_index = pd.DataFrame({
    "name": [c_name for c_name in unique_colors]
}, index = range(len(unique_colors)))
color_names_by_index

In [None]:
# Encode color names in setups
setups = pd.DataFrame({
    "c1":   data["setups"]["From colour"].apply(lambda from_color: color_names_by_index[color_names_by_index["name"] == from_color].index[0]),
    "c2":   data["setups"]["To colour"  ].apply(lambda to_color:   color_names_by_index[color_names_by_index["name"] == to_color  ].index[0]),
    "time": data["setups"]["Setup time" ]
})
setups

In [None]:
# Fix orders table: encode colors and set order index as index
orders = pd.DataFrame(
    {
        "surface": data["orders"]["Surface"].values,
        "color": [color_names_by_index[color_names_by_index["name"] == c].index[0] for c in data["orders"]["Colour"]],
        "deadline": data["orders"]["Deadline"].values,
        "penalty": data["orders"]["Penalty"].values,
    },
    index = data["orders"].index
)
orders  

In [None]:
def first_or_0(list):
    if len(list) == 0:
        return 0
    return list[0] 

# Create table of order-to-order setup times.
setup_times = pd.DataFrame({ 
    order1_id: pd.Series([
        0 if (order1_id == order2_id) else
        first_or_0(setups.loc[
            (setups["c1"] == orders.loc[order1_id, "color"]) &
            (setups["c2"] == orders.loc[order2_id, "color"]),
            "time"
        ].values) for order2_id in orders.index.values
    ], dtype="Int64") for order1_id in orders.index.values 
}).transpose()

# Display table
setup_times

In [None]:
process_times = pd.DataFrame({
    machine_id: [
        order_surface / machine_speed for order_surface in orders["surface"].values
    ] for machine_id, machine_speed in machine_speeds.items()
})
process_times

# Classes

Metaheuristics:
- Multistart:  Improving multiple starts and take the best local optimum
- Taboo Search: Reverse steps are forbidden (taboo-list). Tabu moves are removed from the list after a number of moves. Stop at a certain number of iterations OR when all available moves are taboo. Keep track of incumbent solution.
- Simulated Annealing: Temperature & cooling schedule. Allways accepts improving moves. Non-improving moves with probablity based on the obj. improvement and temperature. Randomly choose a move, compute the gain. If it improves, accept, otherwise accept with probability e^(delta_obj / q). Update incumbent solution. Reduce temperature q. See graph in slides.
- Genetic algorithms: Many different ideas. Population of solutions.
 - Start with a number of random solutions.
 - Create new solutions by combining pairs.
 - Mutations sometimes.
 - Select survivors (elite).



In [None]:
import copy
import matplotlib.pyplot as plt
import itertools as iter
import matplotlib.patches as patches
import matplotlib.collections as coll
import matplotlib.patches as mpatches


class Schedule:
    """A class representing a solution to the paintshop-problem. This is essentially a wrapper for it's internal dict[int, list[int]] object (.order_queue) with some usefull functions.
    """
    
    
    # Constructor. Example call: "Solution()".
    def __init__(self):
        """Constructs an empty solution.
        """
        self.order_queue = [
            [] for i in machine_ids
        ]
        
    
    # Index getter for two indexes. Example call: "Solution()[i]".
    def __getitem__(self, index: tuple[int, int]) -> list[int] | int:
        """Gets the order index at the specified queue index for the specified machine index.

        Args:
            index tuple[int, int]: A tuple containing the machine index at position 0 and the queue index at position 1.

        Returns:
            list[int] or int: The order index at the specified position in the schedule or the queue for the specified machine if the second index is a slice.
        """
        return self.order_queue[index[0]][index[1]]
    
    # Index setter. Example call: "Solution()[i] += [1]"
    def __setitem__(self, index: tuple[int, int], queue: list[int] | int):
        """Sets the queue for the machine with the specified machine-number

        Args:
            machine_id (int): The index of the machine.
            queue (list[int]): The queue for the specified machine.
        """
        self.order_queue[index[0]][index[1]] = queue
    
    # Remove order in place (del Solution[0,0])
    def __delitem__(self, index: tuple[int, int]):
        
        # Set queue as two contatenated slices where the item at index is skipped.
        self[index[0], :] = self[index[0], :index[1]] + self[index[0], (index[1]+1):]
        
    
    # String conversion. Example call: "print(Schedule())" or "str(Schedule())"
    def __str__(self) -> str:
        
        return f"Cost: \t{self.get_cost():.2f}\n" + '\n'.join([f'M{machine_id + 1}: {self[machine_id, :]} ({self.get_cost_machine(machine_id):.2f}) ({len(self[machine_id, :])})' for machine_id in machine_ids])
    
    # Returns the solution in pandas.DataFrame form
    def to_dataframe(self) -> pd.DataFrame:
        """Returns the Solution converted to a pandas.DataFrame.

        Returns:
            pd.DataFrame: The Solution in dataframe form.
        """
        
        return pd.DataFrame.from_dict(
            self.order_queue,
        ).rename(columns ={
            machine_id: f"M{machine_id + 1}" 
            for machine_id in machine_ids
        })
    
    # Returns information about the current machine schedules on the order level.
    def get_machine_schedules(self) -> list[list[dict[str, float]]]:
        """Returns information about the current machine schedules on the order level such as order-processing starting- and ending-times

        Returns:
            list[list[dict[str, float]]]: A list (index: machine ID) containing lists (index: order queue index) of dictionaries containing the following data on the order execution:
            - "start": start time in time units since schedule start, 
            - "duration": processing duration in time units, 
            - "end": order processing completion time in time units since schedule start, 
            - "order": the order ID,
            - "cost": the penalty related to the deadline of the order.
        """
        
        machine_schedules = [[] for _ in machine_ids]
        
        for machine_id in machine_ids:
            
            t_current = 0
            last_order_id = None
        
            for order_id in self.order_queue[machine_id]:
                
                if (last_order_id != None):
                    t_current += setup_times.loc[last_order_id, order_id]
                
                processing_time = orders.loc[order_id, "surface"] / machine_speeds[machine_id]
                machine_schedules[machine_id] += [{
                    "start": t_current, 
                    "duration": processing_time, 
                    "end": t_current + processing_time, 
                    "order": order_id,
                    "cost": 0 if ((t_current + processing_time) > orders.loc[order_id, "deadline"]) else (t_current + processing_time) * orders.loc[order_id, "penalty"]
                }]
                
                t_current += processing_time
                last_order_id = order_id
                
        return machine_schedules
    
    # Draws the schedule using matplotlib.pyplot
    def draw(self) -> None:
        
        # Get machine order execution start times and durations
        machine_schedules = self.get_machine_schedules()
        
        # Determine schedule end-time
        schedule_completion_time = max([machine_schedule[-1]["end"] for machine_schedule in machine_schedules])
        
        # Declaring a figure "gnt"
        fig = plt.figure(figsize= (20,5))
        
        # Iterate over machines
        for machine_id in machine_ids:
            
            # Plot dividing line above current machine schedule
            if machine_id != 0:
                plt.plot(
                    (0, schedule_completion_time), 
                    (machine_id - 0.5, machine_id - 0.5),
                    "black"
                )
            
            # Plot order processing
            for job in machine_schedules[machine_id]:
                
                # Add rectangle representing the job
                plt.gca().add_patch(
                    patches.Rectangle(
                        (
                            job["start"], 
                            machine_id - 0.4
                        ), 
                        job["duration"], 
                        0.8,
                        fill = True, 
                        facecolor = color_names_by_index.loc[orders.loc[job["order"], "color"], "name"],
                        # edgecolor = 'red' if (job["cost"] > 0) else 'black',
                        edgecolor = 'black'
                        # linewith = 1
                    )
                )
                
                # Add text to center of rectangle
                plt.text(
                    job["start"] + job["duration"] / 2, 
                    machine_id,
                    f"O{job['order'] + 1}",
                    # f"O{job['order'] + 1}\n\n{job['cost']:.0f}",
                    horizontalalignment = "center",
                    verticalalignment = "center",
                ) 
        
        # Graph decoration
        plt.title(f"Schedule. Duration: {schedule_completion_time:.0f}. Cost: {self.get_cost():.0f}")
        plt.ylim(-0.5, len(machine_ids) - 0.5)
        plt.xlim(0, schedule_completion_time)
        plt.yticks(machine_ids, [f"M{id+1}" for id in machine_ids])
        plt.xlabel(f"Elapsed time units since start of schedule execution.")
        
        # Compose custom legend
        colors_patches = [
            mpatches.Patch(color=c, label=c) for c in color_names_by_index["name"]
        ]
        plt.legend(
            handles = colors_patches,
            title = "Paint colors"
        )
        # plt.grid()
        
        # Show graph
        plt.show()
        
        
    # Return the time at which the machine with the specified ID finishes it's order queue
    def get_finish_time(self, machine_id: int) -> float:
        
        # Processing time when starting on the current order
        t = 0
        last_order_id = None
        
        # Iterate over orders in queue
        for order_id in self.order_queue[machine_id]:
            
            # Add processing time to current time
            t += process_times.loc[order_id, machine_id]
            
            # Add setup time
            if (last_order_id != None):
                t += setup_times.loc[last_order_id, order_id]
            
            # Update last order ID
            last_order_id = order_id
            
        return t
    
    # Get the cost for the machine with the given machine_id
    def get_cost_machine(self, machine_id):
        # Processing time when starting on the current order
        t = 0
        last_order_id = None
            
        # Initialize penalty
        total_penalty = 0
            
        # Iterate over orders in queue
        for order_id in self.order_queue[machine_id]:
            
            # Add processing time to current time
            t += process_times.loc[order_id, machine_id]
            
            # Add setup time
            if (last_order_id != None):
                t += setup_times.loc[last_order_id, order_id]
            
            # Add penalty to total_cost
            if (t > orders.loc[order_id, 'deadline']):
                total_penalty += orders.loc[order_id, 'penalty'] * (t - orders.loc[order_id, 'deadline'])
            
            # Update last order ID
            last_order_id = order_id
        
        return total_penalty
    
    
    # Returns the total cost of the solution
    def get_cost(self) -> float:
        """Returns the total penalty for this schedule.

        Returns:
            float: The total penalty
        """
        
        # Initialize penalty
        total_penalty = 0
        
        # Iterate over machines in schedule
        for machine_id in machine_ids:
            
            total_penalty += self.get_cost_machine(machine_id)
        
        # Return total penalty
        return total_penalty
    
    
    # Returns a list of tuples containing the swaps
    def get_swaps(self) -> list[(tuple[int, int], tuple[int, int])]:
        
        # Get all indices of the orders.
        order_indices = [(machine_id, queue_index) for machine_id in machine_ids for queue_index in range(len(self.order_queue[machine_id]))]
        
        # Return all combinations of length 2.
        return list(iter.combinations(order_indices, 2))
    
    
    # Get item
    def get_item(self, index: tuple[int, int]):
        
        return self.order_queue[index[0]][index[1]]
    
    
    # Set item
    def set_item(self, index, value):
        self.order_queue[index[0]][index[1]] = value
    
    # Return a swapped copy of self
    def get_copy(self):
        return copy.deepcopy(self)

In [92]:
from abc import ABC, abstractmethod

# Move (Abstract Base Class) (https://docs.python.org/3/library/abc.html)
class Move(ABC):
    
    # As I'm still testing out abstract base classes, I'm raising an exception.
    # Apparantly, they can only be called by subclasses calling super.func()
    @abstractmethod
    def get_moved(self, schedule_old: Schedule) -> Schedule:
        raise Exception("Abstract method used.")

    # @abstractmethod
    # def get_gain(self) -> float:
    #     raise Exception("Abstract method used.")

# Swap two orders by queue index.
class SwapOrders(Move):
    
    def __init__(self, queue_index_1: tuple[int, int], queue_index_2: tuple[int, int]):
        self.a = queue_index_1
        self.b = queue_index_2
    
    # Returns a swapped copy of the specified schedule.
    def get_moved(self, old: Schedule) -> Schedule:
        
        # Create copy of schedule
        new = copy.deepcopy(self)
        
        # Apply swap to new
        new[self.b] = old[self.a]
        new[self.a] = old[self.b]
        
        # Return swapped copy
        return new
    
    # # Get the change in cost resulting from this swap in a optimised way.
    # def get_gain(self, s: Schedule):
        
    #     return 

# Swap two orders by queue index.
class MoveOrder(Move):
    
    def __init__(self, old_index: tuple[int, int], new_index: tuple[int, int]):
        self.old_index = old_index
        self.new_index = new_index
    
    
    # Returns a swapped copy of the specified schedule.
    def get_moved(self, schedule_old: Schedule) -> Schedule:
        
        # Create copy of schedule
        new = copy.deepcopy(schedule_old)
        
        # Apply move (little something i leared called 'slice assignment' and the 'del keyword')
        new[self.new_index[0], self.new_index[1]:self.new_index[1]] = [schedule_old[self.old_index]]
        del new[self.old_index]
        
        # Return swapped copy
        return new


# Swaps the queue of two machines.
class SwapQueues(Move):
    
    def __init__(self, machine_index_a: int, machine_index_b: int):
        self.machine_a = machine_index_a
        self.machine_b = machine_index_b
    
    def get_moved(self, schedule_old: Schedule) -> Schedule:
        
        # Create copy of schedule
        schedule_new = copy.deepcopy(schedule_old)
        
        # Apply move (swap two queues)
        schedule_new[self.machine_a, :] = schedule_old[self.machine_b, :]
        schedule_new[self.machine_b, :] = schedule_old[self.machine_a, :]
        
        # Return swapped copy
        return schedule_new
    
    
# class ShuffleEach(Move):
    
#     def get_moved(self, schedule_old: Schedule) -> Schedule:
        
#         # Create copy of schedule
#         new = copy.deepcopy(schedule_old)
        
#         # Apply move (shuffle each queue)
#         new[self.new_index[0], self.new_index[1]:self.new_index[1]] = [schedule_old[self.old_index]]
#         del new[self.old_index]
        
#         # Return swapped copy
#         return new
        

# ???
Move.register(SwapOrders)
Move.register(MoveOrder)

__main__.MoveOrder

# Solution

In [40]:
def heuristic_constructive_simple() -> Schedule:
    """Constructs a solution according to the following heuristic:
    1. Create an empty solution. (Empty lists (representing order-numbers) by machine-numbers in a dictionary)
    2. Assign the order with the lowest order-number to the machine with the lowest amount of assigned orders, adding it to the end to the order-queue for that machine. The tiebreaking rule is that the machine with the lowest machine-number gets the order.
    3. Go to step 2 unless all orders are assigned.
    
    Returns:
        dict[int, list[int]]: The constructed solution. 
    """
    
    
    # Construct an empty solution dictionary.
    schedule = Schedule()
    
    # For each order (ordered by order-number ascending).
    for order_id in orders.index.values:
        
        # Determine machine index with the shortest queue (adding a machine's index (scaled to a fraction) works like the tiebreaking rule).
        machine_id_next = sorted(
            machine_ids, 
            key = lambda i:
                len(schedule[i, :]) +
                i / len(machine_speeds)
        )[0]
        
        # Add order to machine queue.
        schedule[machine_id_next, :] += [order_id]
        
    return schedule

In [45]:
def heuristic_constructive_simple_2() -> Schedule:
    """Constructs a solution according to the following heuristic:
    1. Create an empty solution. (Empty lists (representing order-numbers) by machine-numbers in a dictionary)
    2. Assign the order with the lowest order-number to the machine with the lowest amount of assigned orders, adding it to the end to the order-queue for that machine. The tiebreaking rule is that the machine with the lowest machine-number gets the order.
    3. Go to step 2 unless all orders are assigned.
    
    Returns:
        dict[int, list[int]]: The constructed solution. 
    """
    
    
    # Construct an empty solution dictionary.
    schedule = Schedule()
    
    # For each order (ordered by order-number ascending).
    for order_id in orders.index.values:
        
        # Determine machine index with the shortest queue (adding a machine's index (scaled to a fraction) works like the tiebreaking rule).
        machine_id_next = sorted(
            machine_ids, 
            key = lambda i:
                schedule.get_finish_time(i) + 
                i / len(machine_speeds), 
        )[0]
        
        # Add order to machine queue.
        schedule[machine_id_next] += [order_id]
        
    return schedule

In [46]:
def heuristic_constructive_simple_3() -> Schedule:
    """Constructs a solution according to the following heuristic:
    1. Create an empty solution. (Empty lists (representing order-numbers) by machine-numbers in a dictionary)
    2. Assign the order with the lowest order-number to the machine with the lowest amount of assigned orders, adding it to the end to the order-queue for that machine. The tiebreaking rule is that the machine with the lowest machine-number gets the order.
    3. Go to step 2 unless all orders are assigned.
    
    Returns:
        dict[int, list[int]]: The constructed solution. 
    """
    
    
    # Construct an empty solution dictionary.
    schedule = Schedule()
    
    # For each order (ordered by order-number ascending).
    for order_id in sorted(orders.index.values, key = lambda order_id: orders.loc[order_id, 'deadline']):
        
        # Determine machine index with the shortest queue (adding a machine's index (scaled to a fraction) works like the tiebreaking rule).
        machine_id_next = sorted(
            machine_ids, 
            key = lambda i:
                schedule.get_finish_time(i) + 
                i / len(machine_speeds), 
        )[0]
        
        # Add order to machine queue.
        schedule[machine_id_next] += [order_id]
        
    return schedule

In [47]:
def heuristic_constructive_random() -> Schedule:
    """Constructs a solution according to the following heuristic:
    1. Create an empty solution. (Empty lists (representing order-numbers) by machine-numbers in a dictionary)
    2. Assign the order with the lowest order-number to the machine with the lowest amount of assigned orders, adding it to the end to the order-queue for that machine. The tiebreaking rule is that the machine with the lowest machine-number gets the order.
    3. Go to step 2 unless all orders are assigned.
    
    Returns:
        dict[int, list[int]]: The constructed solution. 
    """
    
    
    # Construct an empty solution dictionary.
    schedule = Schedule()
    
    # Create list of shuffled order ID's
    order_ids_remaining = orders.index.values
    rng.shuffle(order_ids_remaining)
    
    while len(order_ids_remaining) > 0:
        
        next_order_id_index = rng.choice(range(len(order_ids_remaining)))
        
        schedule[rng.choice(machine_ids)] += [order_ids_remaining[next_order_id_index]]
        
        order_ids_remaining = np.delete(order_ids_remaining, next_order_id_index)      
    return schedule

In [48]:
import math


def heuristic_improvement_best(initial: Schedule) -> Schedule:
    
    current_schedule = initial
    
    print(f"Initial:")
    print(str(current_schedule) + "\n")
    
    while True:
    
        best_swap = None
        best_swap_cost = initial.get_cost()
        
        for swap in current_schedule.get_swaps():
            swapped_schedule: Schedule = current_schedule.get_copy(swap)
            swapped_cost = swapped_schedule.get_cost()
            
            # print(f"Swap: {swap}, Cost: {swapped_cost}")
            
            if (swapped_cost < best_swap_cost):
                best_swap = swap
                best_swap_cost = swapped_cost
        
        # Break the loop if no improving swap was found.
        if best_swap == None:
            break
        
        current_schedule.swap(best_swap)
        print(f"Swapped: {best_swap}.")
        print(str(current_schedule) + "\n")
        
    return current_schedule

In [49]:
def heuristic_improvement_first(initial: Schedule) -> Schedule:
    
    current_schedule = initial
    
    print(f"Initial:")
    print(str(current_schedule) + "\n")
    
    while True:
    
        first_improvement = None
        initial_swap_cost = initial.get_cost()
        
        for swap in current_schedule.get_swaps():
            swapped_schedule: Schedule = current_schedule.get_copy(swap)
            swapped_cost = swapped_schedule.get_cost()
            
            # print(f"Swap: {swap}, Cost: {swapped_cost}")
            # 
            if (swapped_cost < initial_swap_cost):
                first_improvement = swap
                break
        
        # Break the loop if no improving swap was found.
        if first_improvement == None:
            break
        
        current_schedule.swap(first_improvement)
        print(f"Swapped: {first_improvement}.")
        print(str(current_schedule) + "\n")
        
    return current_schedule

In [50]:
# heuristic_improvement_best(heuristic_constructive_simple()).draw()
# heuristic_constructive_simple().draw()

In [None]:
print('\n\n'.join([
    f"{schedule}" for heuristic, schedule in {
        "constructive_simple": heuristic_constructive_simple(),
        "constructive_simple_2": heuristic_constructive_simple_2(),
        "constructive_simple_3": heuristic_constructive_simple_3(),
        "constructive_random": heuristic_constructive_random()
    }.items()
]))

In [52]:
# import matplotlib.pyplot as plt

# plt.hist([s.get_cost() for s in random_schedules], bins = 30)
# plt.show()

In [53]:
import itertools as iter

sol = heuristic_constructive_simple()

# Gett

In [None]:
a = heuristic_constructive_simple()
a.get_copy(((0,1), (1,6)))