In [1]:
import os
import pandas as pd
import numpy as np
import random
from deap import base, creator, tools, algorithms

np.random.seed(42)  # for reproducibility
# Get the current working directory
current_directory = os.getcwd()

# Construct the relative path to prediction RUL file
rul_filename = "RUL_consultancy_predictions_A3-2.csv"
rul_path = os.path.join(current_directory, rul_filename)

# Read the CSV file
rul_df = pd.read_csv(rul_path, delimiter=';')
print(rul_df)


    RUL   id
0   135    1
1   125    2
2    63    3
3   100    4
4   103    5
..  ...  ...
95  140   96
96  109   97
97   87   98
98  127   99
99   24  100

[100 rows x 2 columns]


In [2]:
# Define constants
M = len(rul_df) # number of engines
G = 4 # Total number of teams (2 type A, 2 type B)
T = 30 # Planning horizon in days
MAX_DAILY_COST = 250

In [3]:
# Define maintenance times for teams A and B
maintenance_duration_a = [4 if i < 20 else 3 if 20 <= i < 55 else 2 if 55 <= i < 80 else 8 for i in range(1, M + 1)]
maintenance_duration_b = [time_a + 1 if i < 25 else time_a + 2 if 25 <= i < 70 else time_a + 1 for i, time_a in enumerate(maintenance_duration_a, start=1)]


# Define engine costs
engine_costs = [4 if i < 21 else 3 if 21 <= i < 31 else 2 if 31 <= i < 46 else 5 if 46 <= i < 81 else 6 for i in range(1, M + 1)]



In [4]:

# # Penalty cost function
# def penalty_cost(engine_cost, days_past_due):
#     return min(engine_cost * (days_past_due)**2, MAX_DAILY_COST)


def generate_random_schedule(T):
    individual = []
    # Track the availability of each team (A and B) on each day
    team_availability = {day: {'A': 0, 'B': 0} for day in range(1, T + 1)}
    
    # Apply filter to  allocate teams to engines that have a predicted safety due date of less than T = 30.

    filtered_df = rul_df[rul_df['RUL'] <= T].copy()
    engine_ids = list(filtered_df['id'])
    random.shuffle(engine_ids)
    
    # print("Engine IDs:", engine_ids)
    
    # Randomly assign teams to engines within the planning horizon
    for engine_id in engine_ids:
        # print("\nProcessing Engine ID:", engine_id)
    
        team_type = np.random.choice(['A', 'B'])  # Assume team type is A for this example
        # print("Selected Team Type:", team_type)
    
        # Determine maintenance duration based on team type
        maintenance_days = maintenance_duration_a[engine_id - 1] if team_type == 'A' else maintenance_duration_b[engine_id - 1]
        # print("Maintenance Days Required:", maintenance_days)
    
        valid_start_dates = []

        # Search for valid start dates with continuous available days for maintenance duration
        for start_day in range(1, T - maintenance_days + 2):
            is_valid = True
            for day in range(start_day, start_day + maintenance_days):
                if team_availability[day][team_type] >= 2:  # Check if max team limit is exceeded on any day
                    is_valid = False
                    break
            if is_valid:
                valid_start_dates.append(start_day)
     
        # print("Team Availability:", team_availability)
        # print("Valid Start Dates:", valid_start_dates)
    
        
        if not valid_start_dates:
            # print("No valid start date found. Skipping this engine.")
            continue
           
                    
        start_day = np.random.choice(valid_start_dates)
        end_day = start_day + maintenance_days - 1
    
        # print("Selected Start Date:", start_date)
        # print("Selected End Date:", end_date)
    
        # Update team availability for the selected team and days
        for day in range(start_day, end_day + 1):
            team_availability[day][team_type] += 1
    
        # print("Team Availability Updated:", team_availability)
    
    
        # RUL = filtered_df.loc[filtered_df['id'] == engine_id, 'RUL'].values[0]
        # safety_due_date = RUL
        # # print("Safety Due Date:", safety_due_date)
        # 
        # 
        # 
        # if end_date > safety_due_date:
        #     penalty_cost_value = penalty_cost(engine_costs[engine_id - 1], end_date - safety_due_date)
        # else:
        #     penalty_cost_value = 0
    
        # print("Penalty Cost Value:", penalty_cost_value)
    
        # Append to individual list
        individual.append((engine_id, team_type, start_day))
    return individual


print(generate_random_schedule(T))

# # Create DataFrame from individual list
# schedule_df = pd.DataFrame(generate_random_schedule(T))
# 
# print("\nFinal Schedule DataFrame:")
# print(schedule_df.sort_values(by='Start_date'))
# print("Number of Scheduled Maintenance Tasks:", len(schedule_df))


[(42, 'A', 20), (37, 'A', 15), (100, 'A', 8), (35, 'A', 24), (82, 'A', 18), (91, 'A', 7), (56, 'A', 27), (61, 'B', 21), (68, 'B', 8), (24, 'B', 3), (76, 'B', 21), (34, 'B', 12), (36, 'B', 6), (64, 'B', 26), (49, 'A', 1), (20, 'B', 24), (31, 'B', 14), (66, 'B', 1), (41, 'A', 28), (77, 'B', 28)]


In [5]:
# Define the problem as a minimization problem
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.FitnessMin)


# Initialization functions for individual and population.
def init_individual(T):
    return creator.Individual(generate_random_schedule(T))

def init_population(size, T):
    return [init_individual(T) for _ in range(size)]

In [6]:

# Define the fitness function
def calculate_penalty(engine_id, start_day, team_type):
    engine_index = engine_id - 1
    RUL = rul_df.loc[engine_index, 'RUL']
    safety_due_date = RUL
    maintenance_days = maintenance_duration_a[engine_index] if team_type == 'A' else maintenance_duration_b[engine_index]

    end_day = start_day + maintenance_days - 1
    penalty = 0

    if end_day> safety_due_date:
        overdue_days = end_day - safety_due_date
        penalty = min(MAX_DAILY_COST, engine_costs[engine_index] * overdue_days ** 2)

    return penalty


def evaluate(individual):
    total_penalty = 0
    for engine_id, team_type, start_day in individual:
        engine_index = engine_id - 1
        maintenance_days = maintenance_duration_a[engine_index] if team_type == 'A' else maintenance_duration_b[engine_index]
        end_day = start_day + maintenance_days - 1
        if end_day > T:
            return (float('inf'),)  # Invalid schedule, penalty is infinity
        penalty = calculate_penalty(engine_id, start_day, team_type)
        total_penalty += penalty
        print(total_penalty)

    return (total_penalty,)


# # Mutation function
# def mutate_individual(individual, indpb=0.05):
#     for i in range(len(individual)):
#         if random.random() < indpb:
#             individual[i] = (individual[i][0], random.choice(["A","B"]))
#     return repair_individual(individual)

In [7]:
# Register genetic algorithm operators
toolbox = base.Toolbox()
toolbox.register("individual", init_individual)
toolbox.register("population", init_population)
toolbox.register("evaluate", evaluate)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("select", tools.selTournament, tournsize=3)
# Create a population
population = toolbox.population(size=100, T=T)


def main():
    random.seed(64)

    # Set genetic algorithm parameters
    CXPB = 0.9
    NGEN = 50

    # Evaluate the entire population
    fitnesses = list(map(toolbox.evaluate, population))
    for ind, fit in zip(population, fitnesses):
        ind.fitness.values = fit

    # Evolution process
    for gen in range(NGEN):
        # Select the next generation individuals
        offspring = toolbox.select(population, len(population))
        # Clone the selected individuals
        offspring = list(map(toolbox.clone, offspring))

        # Apply crossover and mutation
        for child1, child2 in zip(offspring[::2], offspring[1::2]):
            if random.random() < CXPB:
                toolbox.mate(child1, child2)
                del child1.fitness.values
                del child2.fitness.values


        # Evaluate the individuals with an invalid fitness
        invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
        fitnesses = map(toolbox.evaluate, invalid_ind)
        for ind, fit in zip(invalid_ind, fitnesses):
            ind.fitness.values = fit

        # Replace population with the next generation
        population[:] = offspring

    # Gather all the fitnesses in the final population and print the stats
    fits = [ind.fitness.values[0] for ind in population]

    print("Min:", min(fits))
    print("Max:", max(fits))
    print("Avg:", sum(fits) / len(population))
    print("Std:", np.std(fits))

    # Display the best individuals
    best_individuals = tools.selBest(population, k=10)  # Select top 10 best individuals
    print("\nBest individuals:")
    for ind in best_individuals:
        print(ind, ind.fitness.values)
    return 

if __name__ == "__main__":
    main()

0
80
176
176
176
176
176
176
426
676
926
926
926
926
926
926
926
245
245
245
345
345
345
345
595
645
743
793
793
973
973
1018
1042
1042
1042
250
250
250
250
250
250
500
500
572
577
705
711
961
1211
1211
1211
1211
1456
1464
1714
1714
250
250
250
250
250
500
500
750
750
750
750
750
1000
1180
1430
1436
1686
1718
1720
1720
1720
1720
0
0
0
0
0
20
270
270
470
720
720
845
845
872
872
1122
1122
1372
1408
1650
1650
0
0
0
0
250
250
330
348
402
402
402
652
702
702
702
738
980
980
1230
1480
1480
0
0
36
36
286
286
411
661
661
661
679
679
699
699
824
824
1074
1074
1324
0
0
0
0
250
270
270
520
520
520
520
565
565
565
565
815
0
0
250
346
346
596
596
846
846
846
870
1120
1370
1370
1370
1370
1402
1402
1420
1420
1452
245
277
277
277
277
527
527
777
777
1027
1027
1027
1027
1027
1072
1322
1572
1644
1644
1894
0
0
128
128
178
178
178
178
178
428
428
678
680
930
930
930
1180
1180
1186
0
0
72
72
72
322
572
822
822
822
822
822
822
1002
1252
1350
1600
1600
1850
1850
1850
250
400
400
400
650
650
900
1150
1155
115

In [8]:
best_ind=[(91, 'B', 4), (31, 'B', 7), (66, 'A', 6), (66, 'B', 2), (66, 'A', 8), (20, 'A', 6), (41, 'A', 8), (24, 'B', 13), (64, 'A', 4), (24, 'B', 21), (56, 'A', 5), (40, 'A', 25), (90, 'B', 5), (49, 'A', 3), (77, 'B', 17), (61, 'A', 20), (34, 'A', 3), (49, 'A', 12), (42, 'A', 3)]

df = pd.DataFrame(best_ind, columns=['Engine_id', 'Team', 'Start_Date'])

In [9]:
df.sort_values(by='Start_Date', inplace=True)
print(df)

    Engine_id Team  Start_Date
3          66    B           2
18         42    A           3
16         34    A           3
13         49    A           3
8          64    A           4
0          91    B           4
10         56    A           5
12         90    B           5
2          66    A           6
5          20    A           6
1          31    B           7
6          41    A           8
4          66    A           8
17         49    A          12
7          24    B          13
14         77    B          17
15         61    A          20
9          24    B          21
11         40    A          25


## Other Drafts. You can see a repair function for invalid individuals.

In [None]:

# Penalty cost function
def penalty_cost(engine_cost, days_past_due):
    return min(engine_cost * (days_past_due)**2, MAX_DAILY_COST)


def generate_random_schedule(T):
    individual = []
    # Track the availability of each team (A and B) on each day
    team_availability = {day: {'A': 0, 'B': 0} for day in range(1, T + 1)}
    
    # Apply filter to allocate allocate teams to engines that have a predicted safety due date of less than T = 30.

    filtered_df = rul_df[rul_df['RUL'] <= T].copy()
    engine_ids = list(filtered_df['id'])
    random.shuffle(engine_ids)
    
    print("Engine IDs:", engine_ids)
    
    # Randomly assign teams to engines within the planning horizon
    for engine_id in engine_ids:
        print("\nProcessing Engine ID:", engine_id)
    
        team_type = np.random.choice(['A', 'B'])  # Assume team type is A for this example
        print("Selected Team Type:", team_type)
    
        # Determine maintenance duration based on team type
        maintenance_days = maintenance_duration_a[engine_id - 1] if team_type == 'A' else maintenance_duration_b[engine_id - 1]
        print("Maintenance Days Required:", maintenance_days)
    
        valid_start_dates = []

        # Search for valid start dates with continuous available days for maintenance duration
        for start_date in range(1, T - maintenance_days + 2):
            is_valid = True
            for day in range(start_date, start_date + maintenance_days):
                if team_availability[day][team_type] >= 2:  # Check if max team limit is exceeded on any day
                    is_valid = False
                    break
            if is_valid:
                valid_start_dates.append(start_date)
     
        print("Team Availability:", team_availability)
        print("Valid Start Dates:", valid_start_dates)
    
        
        if not valid_start_dates:
            print("No valid start date found. Skipping this engine.")
            continue
           
                    
        start_date = np.random.choice(valid_start_dates)
        end_date = start_date + maintenance_days - 1
    
        print("Selected Start Date:", start_date)
        print("Selected End Date:", end_date)
    
        # Update team availability for the selected team and days
        for day in range(start_date, end_date + 1):
            team_availability[day][team_type] += 1
    
        print("Team Availability Updated:", team_availability)
    
    
        RUL = filtered_df.loc[filtered_df['id'] == engine_id, 'RUL'].values[0]
        safety_due_date = RUL
        print("Safety Due Date:", safety_due_date)
        
    
    
        if end_date > safety_due_date:
            penalty_cost_value = penalty_cost(engine_costs[engine_id - 1], end_date - safety_due_date)
        else:
            penalty_cost_value = 0
    
        print("Penalty Cost Value:", penalty_cost_value)
    
        # Append to individual list
        individual.append({
            'Engine_id': engine_id,
            'Team': team_type, 
            'Safety_due_date': safety_due_date,
            'Start_date': start_date,
            'End_date': end_date,
            'Penalty_cost': penalty_cost_value
        })
    return individual

# Create DataFrame from individual list
schedule_df = pd.DataFrame(generate_random_schedule(T))

print("\nFinal Schedule DataFrame:")
print(schedule_df.sort_values(by='Start_date'))
print("Number of Scheduled Maintenance Tasks:", len(schedule_df))


In [None]:
import random
import os
import pandas as pd
import numpy as np
from deap import base
from deap import creator
from deap import tools

In [None]:

# Define constants
M = len(rul_df)  # number of engines

G = 4  # Total number of teams (2 type A, 2 type B)
TEAM_A = ['T1', 'T3']
TEAM_B = ['T2', 'T4']
T = 30  # Planning horizon in days
MAX_DAILY_COST = 250
# Define maintenance times for teams A and B
# Maintenance durations
maintenance_duration_a = [4 if i < 20 else 3 if 20 <= i < 55 else 2 if 55 <= i < 80 else 8 for i in range(M)]
maintenance_duration_b = [time_a + 1 if i < 25 else time_a + 2 if 25 <= i < 70 else time_a + 1 for i, time_a in enumerate(maintenance_duration_a)]

# Engine costs
engine_costs = [4 if i < 20 else 3 if 20 <= i < 30 else 2 if 30 <= i < 45 else 5 if 45 <= i < 80 else 6 for i in range(M)]

print(f'Engine costs: {engine_costs}')
print(f'Maintenance Duration for Team A: {maintenance_duration_a}')
print(f'Maintenance Duration for Team B: {maintenance_duration_b}')



In [None]:
# Define the problem as a minimization problem
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.FitnessMin)


# Initialization functions for individual and population.
def init_individual():
    ''' Assigns teams to engines randomly '''
    return creator.Individual([(engine_id, random.choice(TEAM_A + TEAM_B)) for engine_id in rul_df.index])

def init_population(size):
    return [init_individual() for _ in range(size)]


# Define the fitness function
def calculate_penalty(engine_id, start_day, team_type):
    Rj = rul_df.loc[engine_id, 'RUL']
    ts = Rj - 1
    if team_type == 'A':
        maintenance_time = maintenance_duration_a[engine_id]
    else:
        maintenance_time = maintenance_duration_b[engine_id]
    
    finish_day = start_day + maintenance_time
    penalty = 0
    
    if finish_day > ts:
        overdue_days = finish_day - ts
        penalty = min(MAX_DAILY_COST, engine_costs[engine_id] * overdue_days ** 2)
    
    return penalty


def evaluate(individual):
    total_penalty = 0
    team_end_times = [0] * G  # End times for all teams

    for engine_id, team in individual:
        team_index = int(team[1]) - 1  # T1 -> 0, T2 -> 1, etc.
        team_type = 'A' if team in TEAM_A else 'B'
        start_day = team_end_times[team_index]
        
        maintenance_time = maintenance_duration_a[engine_id] if team_type == 'A' else maintenance_duration_b[engine_id]
        finish_day = start_day + maintenance_time
        
        if finish_day > T:
            return (float('inf'),)  # Invalid schedule, penalty is infinity

        penalty = calculate_penalty(engine_id, start_day, team_type)
        total_penalty += penalty

        team_end_times[team_index] = finish_day

    return (total_penalty,)


# Repair function to fix invalid individuals
def repair_individual(individual):
    team_end_times = [0] * G
    for i, (engine_id, team) in enumerate(individual):
        team_index = int(team[1]) - 1
        team_type = 'A' if team in TEAM_A else 'B'
        maintenance_time = maintenance_duration_a[engine_id] if team_type == 'A' else maintenance_duration_b[engine_id]
        start_day = team_end_times[team_index]
        # Find a new start day
        if start_day + maintenance_time > T:
            new_team = random.choice(TEAM_A + TEAM_B)
            individual[i] = (engine_id, new_team)
            new_team_index = int(new_team[1]) - 1
            new_team_type = 'A' if new_team in TEAM_A else 'B'
            new_maintenance_time = maintenance_duration_a[engine_id] if new_team_type == 'A' else maintenance_duration_b[engine_id]
            team_end_times[new_team_index] = max(team_end_times[new_team_index], 0) + new_maintenance_time
        else:
            team_end_times[team_index] = start_day + maintenance_time

    return individual


# Mutation function
def mutate_individual(individual, indpb=0.05):
    for i in range(len(individual)):
        if random.random() < indpb:
            individual[i] = (individual[i][0], random.choice(TEAM_A + TEAM_B))
    return repair_individual(individual)


In [None]:
# Register genetic algorithm operators
toolbox = base.Toolbox()
toolbox.register("individual", init_individual)
toolbox.register("population", init_population)
toolbox.register("evaluate", evaluate)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", mutate_individual)
toolbox.register("select", tools.selTournament, tournsize=3)

In [None]:
# Create a population
population = toolbox.population(size=100)

In [None]:
def main():
    random.seed(64)
    
    # Set genetic algorithm parameters
    CXPB, MUTPB, NGEN = 0.5, 0.2, 50
    
    # Evaluate the entire population
    fitnesses = list(map(toolbox.evaluate, population))
    for ind, fit in zip(population, fitnesses):
        ind.fitness.values = fit
    
    # Evolution process
    for gen in range(NGEN):
        # Select the next generation individuals
        offspring = toolbox.select(population, len(population))
        # Clone the selected individuals
        offspring = list(map(toolbox.clone, offspring))
        
        # Apply crossover and mutation
        for child1, child2 in zip(offspring[::2], offspring[1::2]):
            if random.random() < CXPB:
                toolbox.mate(child1, child2)
                del child1.fitness.values
                del child2.fitness.values
        
        for mutant in offspring:
            if random.random() < MUTPB:
                toolbox.mutate(mutant)
                del mutant.fitness.values
        
        # Evaluate the individuals with an invalid fitness
        invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
        fitnesses = map(toolbox.evaluate, invalid_ind)
        for ind, fit in zip(invalid_ind, fitnesses):
            ind.fitness.values = fit
        
        # Replace population with the next generation
        population[:] = offspring
    
    # Gather all the fitnesses in the final population and print the stats
    fits = [ind.fitness.values[0] for ind in population]
    
    print("Min:", min(fits))
    print("Max:", max(fits))
    print("Avg:", sum(fits) / len(population))
    print("Std:", np.std(fits))
    
    # Display the best individuals
    best_individuals = tools.selBest(population, k=10)  # Select top 10 best individuals
    print("\nBest individuals:")
    for ind in best_individuals:
        print(ind, ind.fitness.values)
        
if __name__ == "__main__":
    main()