In [4]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime

In [47]:
# DATA EXTRACTION
df_BOM = pd.read_csv("sample_revised.csv")
display(df_BOM.head())
print(df_BOM['workcenter'].unique())
df_machine = pd.DataFrame({
    'workcenter': ["WC#1", "WC#2", "WC#3"],
    'M1': [3, 2, 3],
    'M2': [2, 1, 2],
    'M3': [1, 2, 1],
    'M4': [1, 1, 1],
    'M5': [1, 2, 3]
})
display(df_machine)

class WorkCenter:
    def __init__(self, id, dict_machines={}):
        self.id = id
        self.machine = dict_machines
        # dict_machines = {'machine 1': quantity of machine 1}

print(df_machine.columns[1:])
factory = []
for idx, row in df_machine.iterrows():
    workcenter = row['workcenter']
    dict_machines = {}
    for machine in (df_machine.columns[1:]): 
        dict_machines[machine] = row[machine]
    factory.append(WorkCenter(workcenter, dict_machines=dict_machines))

Unnamed: 0,operation,predecessor_operations,end_product,due_date,processing_time,workcenter,machine
0,A,[],0,,3,WC#2,1
1,B,[],0,,2,WC#1,2
2,C,"[""A""]",0,,1,WC#1,3
3,D,"[""B""]",0,,3,WC#2,2
4,E,"[""C"", ""D""]",0,,1,WC#2,4


['WC#2' 'WC#1' 'WC#3']


Unnamed: 0,workcenter,M1,M2,M3,M4,M5
0,WC#1,3,2,1,1,1
1,WC#2,2,1,2,1,2
2,WC#3,3,2,1,1,3


Index(['M1', 'M2', 'M3', 'M4', 'M5'], dtype='object')


In [64]:
class Operation:
    def __init__(self, id, processing_time, workcenter, machine, due_date=None, successors=None):
        self.id = id
        # Precedence constraints
        self.successors = successors if successors else []

        # Workcenter and machine information
        self.workcenter = workcenter
        self.machine = machine

        # Times
        self.processing_time = processing_time
        self.start_time = None
        self.end_time = None
        self.due_date = None if due_date != due_date else due_date

        # Other
        self.scheduled = False

def find_critical_path(operations, operation_ids):
    """
    Finds the critical path among the feasible operations.
    Inputs:
        - operations                    : dictionary {operation_id: Operation()}, a dictionary of all operations.
        - operation_ids                 : list[operation_id],  a list of operation IDs that are currently feasible.
    Output:
        - (critical_path, path_length)  : tuple, containing the critical path (list of operation IDs) and the length of the critical path.
    """

    paths = []
    def dfs(operation_id, current_path, current_length):
        """
        Performs depth-first-search in a given operation network using recursion.
        Inputs: 
            - operation_id          : str, id of the operation to begin DFS at
            - current_path          : list [str], The path of operation IDs visited so far
            - current_length        : float - The cumulative processing time along the current path
        Output:
            - None - The function modifies the 'paths' list in place, adding tuples of (path, length) for each terminal path found.
        Explanation:
            - This function recursively traverses the operation network starting from the given operation_id.
            - It explores all successors of the current operation, appending the current operation to the path and adding its processing time to the cumulative length.
            - If an operation has no successors, it adds the complete path and its length to the 'paths' list.
        """

        operation = operations[operation_id]
        # If the operation has a successor, add to path
        # Otherwise, perform forward recursive DFS on each of the operation's successor to explore all possible paths
        if not operation.successors:
            paths.append((current_path + [operation_id], current_length + operation.processing_time))
        else:
            for succ_id in operation.successors:
                dfs(succ_id, current_path + [operation_id], current_length + operation.processing_time)

    # For every operation, perform DFS
    for op_id in operation_ids:
        dfs(op_id, [], 0)
    return max(paths, key=lambda x: x[1])

def schedule_operations(operations, due_dates, factory):
    """
    Solves the assembly scheduling problem (ASP) using the Longest End Time Scheduling Algorithm (LETSA).
    Inputs:
        - operations            : dictionary {operation_id: Operation()}, a dictionary of all operations.
        - due_dates             : dictionary {operation_id: int}, a dictionary of due dates for each operation.
        - factory               : list [WorkCenter()], a list of WorkCenter objects, containing machine information and availability
    Output:
        - scheduled_operations  : list [Operation()], a list of Operation objects with start and end time schedules.
    """

    scheduled_operations = []
    # [[Step 4]]
    while True:
        # ================================================================================================================
        #  [[4.0]] Feasible operations = every operation that is (1) not scheduled, and (2) has all successors scheduled
        # ================================================================================================================
        feasible_operations = [op_id for op_id, op in operations.items() if not op.scheduled and all(operations[succ_id].scheduled for succ_id in op.successors)]
        if not feasible_operations:
            break # terminate if all operations have been scheduled

        # ===================================================================
        #  [[4.1 - 4.3]] Compute critical path only for feasible operations
        # ===================================================================
        critical_path, _ = find_critical_path(operations, feasible_operations)
        operation_id = critical_path[0]
        operation = operations[operation_id]

        # =====================================================================
        # [[4.4]] Set completion/end time of the selected operation as
        #         (ii) the start time of the successor, if a successor exists
        #         (ii) the project deadline, otherwise 
        # =====================================================================
        if operation.successors:
            tentative_end_time = min(operations[succ_id].start_time for succ_id in operation.successors)
        else:
            tentative_end_time = due_dates[operation.id]

        # ============================================================================
        #   [[4.5]] For each identical machine incuded in the required work-center 
        # ============================================================================
        


        # Set start time of selected operation
        operation.start_time = operation.end_time - operation.processing_time
        operation.scheduled = True
        scheduled_operations.append(operation)
        
    return scheduled_operations

def plot_gantt_chart(scheduled_operations):
    """
    Plots a Gantt chart for the scheduled operations.
    Inputs:
        - scheduled_operations      : list Operation(), a list of scheduled Operation objects.
    Output:
        - None 
        - Displays a Gantt chart.
    """
    fig, ax = plt.subplots(figsize=(10, 6))
    for op in scheduled_operations:
        ax.barh(op.id, op.processing_time, left=op.start_time, color='skyblue')
        ax.text(op.start_time + op.processing_time / 2, op.id, f'{op.start_time}-{op.end_time}', ha='center', va='center', color='black')

    ax.set_xlabel('Time')
    ax.set_ylabel('Operations')
    ax.set_title('Gantt Chart for Assembly Scheduling')
    plt.grid(True)
    plt.show()

def load_operations_from_df(df):
    """
    Loads operations from a DataFrame and creates Operation objects.    
    Inputs:
        - df: pandas DataFrame - A DataFrame containing BOM information with columns 
          ['part', 'operation', 'operation_id', 'components_required', 'processing_time', 'workcenter', 'predecessor_operations'].
    Output:
        - operations: dictionary, {operation_id: Operation()} - A dictionary of Operation objects.
    """
    operations = {}
    for idx, row in df.iterrows():
        operation_id = row['operation']
        successors = row['predecessor_operations']
        due_date = row['due_date']  
        operations[operation_id] = Operation(operation_id, row['processing_time'], row['workcenter'], row['machine'], due_date=due_date, successors=successors)
        
    return operations

def load_due_dates(df):
    """
    Generates due dates for each operation.    
    Inputs:
        - df: pandas DataFrame - A DataFrame containing BOM information.
    Output:
        - due_dates: dictionary, {operation_id: int} - A dictionary of due dates for each operation.
    """
    due_dates = {}
    for idx, row in df.iterrows():
        operation_id = row['operation']
        # Example due date assignment, this should be adjusted as per actual due dates
        due_dates[operation_id] = 20 - idx  # Example: descending due dates
    return due_dates


In [66]:
from math import isnan
operations = load_operations_from_df(df_BOM)
for key, operation in operations.items(): 
    # print(operation)
    print(operation.id)
    print(operation.due_date)
    print("")

A
None

B
None

C
None

D
None

E
None

F
None

G
None

H
None

I
50.0

J
None

K
None

L
None

M
None

N
None

O
80.0

P
None

Q
None

R
None

S
None

T
30.0

U
None



In [60]:
# value = float('nan') 
value = 3
if value != value: 
    print("Value is NaN") 
else: 
    print("Value is not NaN")


Value is not NaN
