In [158]:
import numpy as np
import random
import math

# Define the data
scrubber_inventory = {
    'A1': [1, 1, 1225],
    'A2': [1, 1.5, 1575],
    'A3': [1, 2.5, 2475],
    'A4': [1, 2.5, 1750],
    'A5': [1, 3, 1750],
    'A6': [1, 3.5, 3150],
    'A7': [1, 2.5, 2700],
    'A8': [1, 2.5, 3150],
    'A9': [1, 3.5, 3150],
    'A10': [1, 4, 3825],
    'B1': [1, 1.5, 1400],
    'B2': [1, 2.5, 1720],
    'B3': [1, 3.5, 1720],
    'B4': [1, 3.5, 2200],
    'B5': [1, 5, 2200],
    'C1': [1, 8, 5600],
    'C2': [1, 3, 2970],
    'C3': [1, 3.5, 2460],
    'C4': [1, 4.5, 7740],
    'C5': [1, 5, 9000], 
    'D1': [1, 3, 7100],
    'D2': [1, 6, 4250]
}

scenario1 = {
    'Site 1': [23900, 7],
    'Site 2': [19200, 4],
    'Site 3': [16400, 6],
    'Site 4': [16200, 6],
    'Site 5': [33000, 3]
}

# Define the state representation
# Randomly assign scrubber machines to sites initially
def generate_initial_state(scrubber_inventory, scenario1):
    sites = list(scenario1.keys())
    state = {}
    for site in sites:
        available_scrubbers = list(scrubber_inventory.keys())
        state[site] = random.choice(available_scrubbers)
    return state, available_scrubbers

initial_state = generate_initial_state(scrubber_inventory, scenario1)


# Define the objective function
# Calculate the total cleaning time for all sites
def calculate_total_cleaning_time(state, scrubber_inventory, scenario1):
    total_cleaning_time = 0
    for site in state.keys():
        site_scrubber = state[site]
        site_cleaning_time = scenario1[site][scrubber_inventory[site_scrubber][0]]
        total_cleaning_time += site_cleaning_time
    return total_cleaning_time


# Define the neighborhood structure
# Swap scrubber machines between two sites
def generate_neighboring_state(state):
    sites = list(state.keys())
    site1, site2 = random.sample(sites, 2)
    state_copy = state.copy()
    state_copy[site1], state_copy[site2] = state_copy[site2], state_copy[site1]
    return state_copy


# Define the acceptance probability function
# Accept moves that improve the objective function unconditionally, accept moves that worsen the objective function with a certain probability
def acceptance_probability(delta, temperature):
    return math.exp(-delta / temperature)

# Define the main Simulated Annealing function
def simulated_annealing(scrubber_inventory, scenario1, initial_state, initial_temperature, cooling_rate, iterations):
    current_state = initial_state
    current_cost = calculate_total_cleaning_time(current_state, scrubber_inventory, scenario1)
    best_state = current_state
    best_cost = current_cost
    temperature = initial_temperature

    for i in range(iterations):
        # Generate a neighboring state
        neighboring_state = generate_neighboring_state(current_state)
        neighboring_cost = calculate_total_cleaning_time(neighboring_state, scrubber_inventory, scenario1)

        # Calculate the delta cost
        delta_cost = neighboring_cost - current_cost

        # Accept or reject the neighboring state
        if delta_cost < 0:
            # If the neighboring state is better, accept it
            current_state = neighboring_state
            current_cost = neighboring_cost
            if neighboring_cost < best_cost:
                best_state = neighboring_state
                best_cost = neighboring_cost
        else:
            # If the neighboring state is worse, accept it with a certain probability
            acceptance_prob = acceptance_probability(delta_cost, temperature)
            if random.uniform(0, 1) < acceptance_prob:
                current_state = neighboring_state
                current_cost = neighboring_cost

        # Update the temperature
        temperature *= cooling_rate

    return best_state, best_cost

# Define the hyperparameters
initial_temperature = 1000
cooling_rate = 0.99
iterations = 10000

# Generate an initial state
initial_state = generate_initial_state(scrubber_inventory, scenario1)

# Run the simulated annealing algorithm
best_state, best_cost = simulated_annealing(scrubber_inventory, scenario1, initial_state, initial_temperature, cooling_rate, iterations)

# Extract the scrubbers used in the best state
used_scrubbers_per_site = []
for site in best_state.keys():
    scrubber = best_state[site]
    used_scrubbers_per_site.append(scrubber)


# Print the results
print("Best State:", best_state)
print("Best Cost:", best_cost)
print("Used Scrubbers per Site:", used_scrubbers_per_site)


AttributeError: 'tuple' object has no attribute 'keys'

In [84]:
import pandas as pd
import numpy as np
import random
import math

In [150]:
data_scrubber = pd.read_csv("/Users/macdedieu/Desktop/data_optimizer/Dataset_1.csv", sep=';')
data_site = pd.read_csv("/Users/macdedieu/Desktop/data_optimizer/scenario_1.csv", sep=';')

In [143]:
data_scrubber['efficiency'] = data_scrubber['productivity'] / data_scrubber['time']

In [144]:
data_scrubber.head()

Unnamed: 0,scrubber,inventory,time,productivity,efficiency
0,A1,1,1.0,1225,1225.0
1,A2,1,1.5,1575,1050.0
2,A3,1,2.5,2475,990.0
3,A4,1,2.5,1750,700.0
4,A5,1,3.0,17500,5833.333333


In [145]:
data_site

Unnamed: 0,scenario,area,time
0,Site 1,23900,7
1,Site 2,19200,4
2,Site 3,16400,6
3,Site 4,16200,6
4,Site 5,33000,3


In [146]:
import random

def generate_random_assignment(data_scrubber, data_site):
    scrubber_list = data_scrubber['scrubber'].tolist()
    site_list = data_site['scenario'].tolist()
    assigned_scrubbers = {}
    scrubber_num = len(scrubber_list)
    
    # check if there are enough scrubbers to cover all sites
    min_scrubbers = sum(data_site['area']) // data_scrubber['productivity'].max()
    if scrubber_num < min_scrubbers:
        raise ValueError(f"Not enough scrubbers available. Need at least {min_scrubbers}, but only have {scrubber_num}.")
    
    random.shuffle(scrubber_list)
    random.shuffle(site_list)
    assignment = []
    
    for site_idx, site_name in enumerate(site_list):
        scrubber_combination = []
        total_prod = 0
        
        while scrubber_list:
            scrubber_index = scrubber_list.pop(0)
            # check if scrubber productivity exceeds site area
            total_prod = sum(data_scrubber.loc[data_scrubber['scrubber'].isin(scrubber_combination)]['productivity'])
            if total_prod >= data_site.loc[data_site['scenario'] == site_name]['area'].values[0]:
                diff = total_prod - data_site.loc[data_site['scenario'] == site_name]['area'].values[0]
                max_diff = 1000
                if diff > max_diff:
                    break 

            if data_scrubber.loc[data_scrubber['scrubber'] == scrubber_index]['inventory'].values[0] >= 1:
                data_scrubber.loc[data_scrubber['scrubber'] == scrubber_index, 'inventory'] -= 1
                scrubber_combination.append(scrubber_index)
        
        if not scrubber_combination:
            raise ValueError(f"Not enough scrubbers available to cover Site {site_idx+1}.")
            
        assignment.append(scrubber_combination)
    
    return assignment, site_list


In [151]:
assignment = generate_random_assignment(data_scrubber, data_site)
assignment

([['A3', 'A7', 'B2', 'B4', 'A8', 'C4'],
  ['B5', 'D1'],
  ['A1', 'A9', 'B1'],
  ['B3', 'A4', 'A5'],
  ['A2', 'C2', 'A6', 'D2']],
 ['Site 4', 'Site 1', 'Site 5', 'Site 2', 'Site 3'])

In [137]:
data_scrubber.loc[data_scrubber['scrubber'] == 'A1']['inventory']

0    1
Name: inventory, dtype: int64

In [155]:
def cost_function(assignment, data_scrubber, data_site):
    excess = 0
    for site_idx, scrubber_idxs in enumerate(assignment):
        total_prod = sum(data_scrubber.loc[data_scrubber['scrubber'].isin(scrubber_idxs)]['productivity'])
        area = data_site.loc[data_site['scenario'] == data_site['scenario'].unique()[site_idx]]['area'].values[0]
        excess += max(0, total_prod - area)
    return excess


In [156]:
import random
import math

def cost_function(assignment, data_scrubber, data_site):
    excess = 0
    for site_idx, scrubber_idxs in enumerate(assignment):
        total_prod = sum(data_scrubber.loc[data_scrubber['scrubber'].isin(scrubber_idxs)]['productivity'])
        area = data_site.loc[data_site['scenario'] == data_site['scenario'].unique()[site_idx]]['area'].values[0]
        excess += max(0, total_prod - area)
    return excess

def generate_new_assignment(assignment, data_scrubber, data_site):
    scrubber_list = data_scrubber['scrubber'].tolist()
    site_list = data_site['scenario'].tolist()
    new_assignment = assignment.copy()
    
    # select two random sites
    site1_idx, site2_idx = random.sample(range(len(site_list)), 2)
    site1, site2 = site_list[site1_idx], site_list[site2_idx]
    
    # select two random scrubbers from each site
    site1_scrubber_idxs = [i for i, s in enumerate(assignment) if site_list[i] == site1]
    site2_scrubber_idxs = [i for i, s in enumerate(assignment) if site_list[i] == site2]
    if len(site1_scrubber_idxs) < 2 or len(site2_scrubber_idxs) < 2:
        # cannot swap if either site has less than 2 scrubbers assigned
        return new_assignment
    
    scrubber1_idx1, scrubber1_idx2 = random.sample(site1_scrubber_idxs, 2)
    scrubber2_idx1, scrubber2_idx2 = random.sample(site2_scrubber_idxs, 2)
    
    # check if the new assignment satisfies scrubber and site constraints
    new_site1_scrubbers = [idx for idx in new_assignment[site1_idx] if idx not in [scrubber1_idx1, scrubber1_idx2]]
    new_site2_scrubbers = [idx for idx in new_assignment[site2_idx] if idx not in [scrubber2_idx1, scrubber2_idx2]]
    new_site1_scrubbers.extend([scrubber2_idx1, scrubber2_idx2])
    new_site2_scrubbers.extend([scrubber1_idx1, scrubber1_idx2])
    
    total_prod1 = sum(data_scrubber.loc[data_scrubber['scrubber'].isin(new_site1_scrubbers)]['productivity'])
    total_prod2 = sum(data_scrubber.loc[data_scrubber['scrubber'].isin(new_site2_scrubbers)]['productivity'])
    area1 = data_site.loc[data_site['scenario'] == site1]['area'].values[0]
    area2 = data_site.loc[data_site['scenario'] == site2]['area'].values[0]
    invalid_assignment = False
    
    if total_prod1 > area1 or total_prod2 > area2:
        # new assignment violates site constraints
        invalid_assignment = True
    
    for scrubber_idx in new_site1_scrubbers:
        if data_scrubber.loc[data_scrubber['scrubber'] == scrubber_idx]['inventory'].values[0] < 1:
            # new assignment violates scrubber constraints
            invalid_assignment = True
            
    for scrubber_idx in new_site2_scrubbers:
        if data_scrubber.loc[data_scrubber['scrubber'] == scrubber_idx]['inventory'].values[0] < 1:
            # new assignment violates scrubber constraints
            invalid_assignment = True
            
    if not invalid_assignment:
        # update assignment if new assignment is valid
        new_assignment[site1_idx] = new_site1_scrubbers
        new_assignment[site2_idx] = new_site2_scrubbers

    return new_assignment

def simulated_annealing(data_scrubber, data_site, T=1, alpha=0.99, stopping_T=0.000001, stopping_iter=100000):
    # initialize with a random assignment
    assignment = []
    scrubber_list = data_scrubber['scrubber'].tolist()
    site_list = data_site['scenario'].tolist()
    for site in site_list:
        site_scrubbers = random.sample(scrubber_list, random.randint(2, len(scrubber_list)))
        assignment.append([scrubbers for scrubbers in site_scrubbers])

    # keep track of the best assignment
    best_assignment = assignment
    best_cost = cost_function(assignment, data_scrubber, data_site)

    # simulated annealing
    iteration = 0
    while T > stopping_T and iteration < stopping_iter:
        new_assignment = generate_new_assignment(assignment, data_scrubber, data_site)
        new_cost = cost_function(new_assignment, data_scrubber, data_site)
        delta_cost = new_cost - best_cost
        
        if delta_cost < 0 or math.exp(-delta_cost/T) > random.uniform(0, 1):
            assignment = new_assignment
            
            if new_cost < best_cost:
                best_assignment = new_assignment
                best_cost = new_cost
        
        T *= alpha
        iteration += 1

    return best_assignment, best_cost


In [157]:
best_assignement, best_cost = simulated_annealing(data_scrubber, data_site)

print(best_assignement)
print(best_cost)

[['C5', 'B1', 'A1', 'C4', 'A6', 'D1', 'A2', 'A3', 'C2', 'C3', 'B3', 'B5', 'B2', 'A8', 'A4', 'D2', 'A10', 'A5', 'C1', 'A7', 'A9', 'B4'], ['A5', 'B5', 'A4', 'C1', 'A7', 'C5'], ['A8', 'B1', 'B4', 'A7', 'C5', 'C1', 'A5', 'D2', 'B3', 'A6', 'A1', 'B2', 'A2', 'C4'], ['C3', 'A7', 'A10', 'A1', 'C2', 'C5', 'B2', 'D2', 'B1', 'A4', 'A3', 'A9'], ['A1', 'B1', 'C2', 'A4', 'A9', 'A10', 'B3', 'A7', 'B4', 'D1', 'A2', 'B2', 'A3', 'B5', 'A6', 'C3', 'C5']]
364235
