# Project: Electric Vehicle Charging Scheduling Problem (EVCS)
## First deliverable
## Prescriptive Analytics: Heuristics for Decision Making
### Wilmar Calderón - 201630701

The Electric Vehicle Charging Scheduling (EVCS) problems aims to determine the optimal sequence of charging $n$ vehicles in $m$ chargers. The EVCS can be formulated with multiple objectives as reducing cost or maximizing the charge, which are the most common objectives for the problem. This implementation describes a constructive Heuristic formulation to obtain an initial and feasible solution for the EVCS. The proposed implementation considers two approaches, first, the Earliest Departure First (EDF) and the First Come First Served (FCFS) approach. the former, prioritizes the vehicles that have the nearest departure time to schedule its charge first, the latter considers that the first vehcile that arrives should be attended. Implementation of the Heuristic is described below.

In [142]:
import json
import pandas as pd
from typing import Dict, Tuple
import gurobipy as gp
from gurobipy import GRB
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
from matplotlib.colors import Normalize
import time
import random
import copy

In [143]:
# --- Constants and Configuration (from your notebook) ---
STEP_DURATION = 0.05  # Time spacing: Model parameter (fraction of an hour)

# --- Helper Functions (some adapted from your notebook) ---
def time_to_index(t, delta_t=STEP_DURATION):
    """Converts time to a slot index."""
    return int(round(t / delta_t))

def index_to_time(idx, delta_t=STEP_DURATION):
    """Converts a slot index back to time."""
    return round(idx * delta_t, 2) # Keep precision consistent

In [144]:

def load_and_preprocess_data(file_path):
    """
    Loads data from JSON and preprocesses it for ILS.
    Returns a dictionary of processed data.
    """
    with open(file_path, 'r') as file:
        data = json.load(file)

    processed_data = {}

    # Energy Prices
    energy_prices_df = pd.DataFrame(data['energy_prices'], columns=['time', 'price'])
    processed_data['energy_prices_df'] = energy_prices_df

    # EV Arrivals
    arrivals_df = pd.DataFrame(data['arrivals'], columns=[
        'id', 'arrival_time', 'departure_time', 'required_energy', 'brand',
        'min_charge_rate', 'max_charge_rate', 'ac_charge_rate',
        'dc_charge_rate', 'willingness_to_pay'
    ])
    # Ensure consistent indexing for EVs (0 to N-1)
    arrivals_df = arrivals_df.sort_values(by='id').reset_index(drop=True)
    arrivals_df['ev_idx'] = arrivals_df.index # ev_idx will be 0 to num_evs-1

    arrivals_df['arrival_slot_idx'] = arrivals_df['arrival_time'].apply(lambda t: time_to_index(t, STEP_DURATION))
    arrivals_df['departure_slot_idx'] = arrivals_df['departure_time'].apply(lambda t: time_to_index(t, STEP_DURATION))
    
    # Calculate min/max energy per slot based on rates
    arrivals_df['min_energy_per_slot'] = arrivals_df['min_charge_rate'] * STEP_DURATION
    arrivals_df['max_energy_per_slot'] = arrivals_df['max_charge_rate'] * STEP_DURATION

    processed_data['arrivals_df'] = arrivals_df
    processed_data['num_evs'] = len(arrivals_df)

    # Chargers
    parking_config = data.get('parking_config', {})
    chargers_df = pd.DataFrame(parking_config['chargers'], columns=[
        "charger_id", "power", "type", "operation_cost_per_hour", "compatible_vehicles"
    ])
    # Ensure consistent indexing for chargers (0 to M-1)
    chargers_df = chargers_df.sort_values(by='charger_id').reset_index(drop=True)
    chargers_df['charger_idx'] = chargers_df.index # charger_idx will be 0 to num_chargers-1
    chargers_df['energy_capacity_per_slot'] = chargers_df['power'] * STEP_DURATION
    processed_data['chargers_df'] = chargers_df
    processed_data['num_chargers'] = len(chargers_df)

    processed_data['transformer_limit_kw'] = parking_config['transformer_limit']
    processed_data['transformer_energy_limit_per_slot'] = parking_config['transformer_limit'] * STEP_DURATION
    
    # Max time slot
    # Ensure max_departure_time is valid before processing
    if arrivals_df['departure_time'].empty:
        max_departure_time = 0 # Default if no departures
    else:
        max_departure_time = arrivals_df['departure_time'].max()
    
    processed_data['num_time_slots'] = time_to_index(max_departure_time, STEP_DURATION) + 1 # Slots are 0-indexed

    # Create a direct mapping for energy prices per time_slot_idx
    time_slots_actual = [index_to_time(i, STEP_DURATION) for i in range(processed_data['num_time_slots'])]
    all_slots_df = pd.DataFrame({'time': time_slots_actual})
    
    if not energy_prices_df.empty:
        merged_prices_df = pd.merge_asof(all_slots_df.sort_values('time'), 
                                         energy_prices_df.sort_values('time'), 
                                         on='time', 
                                         direction='backward')
        # Forward fill any remaining NaNs (e.g., if first slot is before first price point)
        # Then backfill any at the very beginning if still NaN
        merged_prices_df['price'] = merged_prices_df['price'].fillna(method='ffill').fillna(method='bfill').fillna(0) 
        processed_data['energy_prices_at_t_idx'] = merged_prices_df['price'].values
    else: # Handle case with no energy prices
        processed_data['energy_prices_at_t_idx'] = np.zeros(processed_data['num_time_slots'])


    # Charger compatibility matrix A[ev_idx, charger_idx]
    A = np.zeros((processed_data['num_evs'], processed_data['num_chargers']), dtype=int)
    for ev_r_idx, ev_row in arrivals_df.iterrows():
        # Extract base brand name (e.g., "Tesla Model S" from "Tesla Model S 100kWh")
        # Your notebook uses: ev_row["brand"].str.rsplit(' ').str[:-1].apply(' '.join)
        # A simpler way for single string:
        parts = ev_row["brand"].split(' ')
        if parts[-1].lower().endswith('kwh'): # Check if last part indicates capacity
            ev_brand_base = " ".join(parts[:-1])
        else:
            ev_brand_base = ev_row["brand"]
            
        for ch_r_idx, ch_row in chargers_df.iterrows():
            if ev_brand_base in ch_row["compatible_vehicles"]:
                 A[ev_row['ev_idx'], ch_row['charger_idx']] = 1
    processed_data['compatibility_matrix'] = A
    
    # Penalty for unserved energy
    if not energy_prices_df['price'].empty and energy_prices_df['price'].max() > 0 :
        processed_data['lambda_penalty_unserved_energy'] = energy_prices_df['price'].max() * 10
    else:
        processed_data['lambda_penalty_unserved_energy'] = 1000 # Default if no prices or all prices are zero

    return processed_data

In [145]:
# --- Objective Function ---
def calculate_objective(solution_P_ict, processed_data):
    """
    Calculates the objective value (total cost + penalties) for a given schedule.
    solution_P_ict: 3D NumPy array [ev_idx, charger_idx, time_idx] -> energy supplied.
    """
    arrivals_df = processed_data['arrivals_df']
    energy_prices_at_t_idx = processed_data['energy_prices_at_t_idx']
    lambda_penalty = processed_data['lambda_penalty_unserved_energy']
    num_evs, num_chargers, num_time_slots = solution_P_ict.shape

    total_charging_cost = 0
    total_penalty_cost = 0

    energy_per_slot = np.sum(solution_P_ict, axis=(0, 1)) 
    total_charging_cost = np.sum(energy_per_slot * energy_prices_at_t_idx)
        
    for ev_idx in range(num_evs):
        ev_info = arrivals_df.iloc[ev_idx]
        required_energy = ev_info['required_energy']
        supplied_energy = np.sum(solution_P_ict[ev_idx, :, :]) 
        
        unserved_energy = max(0, required_energy - supplied_energy)
        total_penalty_cost += lambda_penalty * unserved_energy
        
    return total_charging_cost + total_penalty_cost

In [146]:
# --- Feasibility Check ---
def is_solution_feasible(P_ict, ev_to_charger_map, processed_data, verbose=False):
    """
    Checks if a given solution (P_ict, ev_to_charger_map) is feasible.
    """
    arrivals_df = processed_data['arrivals_df']
    chargers_df = processed_data['chargers_df']
    compatibility_matrix = processed_data['compatibility_matrix']
    transformer_energy_limit_per_slot = processed_data['transformer_energy_limit_per_slot']
    num_evs, num_chargers, num_time_slots = P_ict.shape
    epsilon = 1e-6 # Tolerance for float comparisons

    for t_idx in range(num_time_slots):
        current_transformer_load = np.sum(P_ict[:, :, t_idx])
        if current_transformer_load > transformer_energy_limit_per_slot + epsilon:
            if verbose: print(f"Feasibility Error: Transformer limit exceeded at t_idx {t_idx}. Load: {current_transformer_load:.2f}, Limit: {transformer_energy_limit_per_slot:.2f}")
            return False

        for c_idx in range(num_chargers):
            charger_info = chargers_df.iloc[c_idx]
            current_charger_load_on_charger = np.sum(P_ict[:, c_idx, t_idx])
            if current_charger_load_on_charger > charger_info['energy_capacity_per_slot'] + epsilon:
                if verbose: print(f"Feasibility Error: Charger {c_idx} capacity exceeded at t_idx {t_idx}. Load: {current_charger_load_on_charger:.2f}, Capacity: {charger_info['energy_capacity_per_slot']:.2f}")
                return False
            
            evs_on_charger_this_slot = [ev_idx for ev_idx in range(num_evs) if P_ict[ev_idx, c_idx, t_idx] > epsilon]
            if len(evs_on_charger_this_slot) > 1:
                if verbose: print(f"Feasibility Error: Multiple EVs ({evs_on_charger_this_slot}) on charger {c_idx} at t_idx {t_idx}")
                return False

    for ev_idx in range(num_evs):
        ev_info = arrivals_df.iloc[ev_idx]
        assigned_charger_from_map = ev_to_charger_map[ev_idx]
        chargers_providing_energy_to_ev = [c_idx for c_idx in range(num_chargers) if np.sum(P_ict[ev_idx, c_idx, :]) > epsilon]

        if len(chargers_providing_energy_to_ev) > 1:
            if verbose: print(f"Feasibility Error: EV {ev_idx} receives energy from multiple chargers: {chargers_providing_energy_to_ev}")
            return False
        
        current_assigned_charger_in_P = -1
        if len(chargers_providing_energy_to_ev) == 1:
            current_assigned_charger_in_P = chargers_providing_energy_to_ev[0]
            if assigned_charger_from_map != -1 and assigned_charger_from_map != current_assigned_charger_in_P:
                 if verbose: print(f"Feasibility Warning: EV {ev_idx} map ({assigned_charger_from_map}) vs P_ict ({current_assigned_charger_in_P}) mismatch. P_ict is primary.")
        elif assigned_charger_from_map != -1 and len(chargers_providing_energy_to_ev) == 0 :
            if verbose: print(f"Feasibility Info: EV {ev_idx} mapped to {assigned_charger_from_map} but no energy in P_ict.")


        if current_assigned_charger_in_P != -1: 
            if compatibility_matrix[ev_idx, current_assigned_charger_in_P] == 0:
                if verbose: print(f"Feasibility Error: EV {ev_idx} incompatible with charger {current_assigned_charger_in_P} it's receiving energy from.")
                return False

            total_energy_for_ev_calc = 0
            for t_idx in range(num_time_slots):
                energy_this_slot = P_ict[ev_idx, current_assigned_charger_in_P, t_idx]
                total_energy_for_ev_calc += energy_this_slot

                if energy_this_slot > epsilon: 
                    if not (ev_info['arrival_slot_idx'] <= t_idx < ev_info['departure_slot_idx']):
                        if verbose: print(f"Feasibility Error: EV {ev_idx} (slots {ev_info['arrival_slot_idx']}-{ev_info['departure_slot_idx']-1}) charging outside window at t_idx {t_idx}")
                        return False
                    
                    if energy_this_slot > ev_info['max_energy_per_slot'] + epsilon:
                        if verbose: print(f"Feasibility Error: EV {ev_idx} exceeded max_energy_per_slot ({ev_info['max_energy_per_slot']:.2f}) with {energy_this_slot:.2f} at t_idx {t_idx}")
                        return False
                    
                    # Min energy per slot check: if charging, must be >= min_energy_per_slot,
                    # UNLESS it's the last bit of energy needed and that bit is < min_energy_per_slot.
                    remaining_needed_for_ev_before_this_slot = ev_info['required_energy'] - (total_energy_for_ev_calc - energy_this_slot)
                    if energy_this_slot < ev_info['min_energy_per_slot'] - epsilon and \
                       energy_this_slot < remaining_needed_for_ev_before_this_slot - epsilon:
                         if verbose: print(f"Feasibility Error: EV {ev_idx} below min_energy_per_slot ({ev_info['min_energy_per_slot']:.2f}) with {energy_this_slot:.2f} at t_idx {t_idx} (rem needed before this: {remaining_needed_for_ev_before_this_slot:.2f})")
                         return False 
    return True

In [147]:
# --- Initial Solution Generation (Adapting Price-Aware EDF) ---
def generate_initial_solution_pa_edf(processed_data):
    arrivals_df = processed_data['arrivals_df'].copy() 
    chargers_df = processed_data['chargers_df'].copy()
    energy_prices_at_t_idx = processed_data['energy_prices_at_t_idx']
    compatibility_matrix = processed_data['compatibility_matrix']
    transformer_energy_limit_per_slot = processed_data['transformer_energy_limit_per_slot']
    
    num_evs = processed_data['num_evs']
    num_chargers = processed_data['num_chargers']
    num_time_slots = processed_data['num_time_slots']
    epsilon = 1e-6

    P_ict = np.zeros((num_evs, num_chargers, num_time_slots))
    ev_to_charger_map = np.full(num_evs, -1, dtype=int)
    
    sorted_evs_df = arrivals_df.sort_values(by='departure_slot_idx')
    sorted_charger_indices = chargers_df.sort_values(by='power', ascending=False)['charger_idx'].tolist()

    # Keep track of actual transformer load based on P_ict as it's built
    # current_transformer_load_per_slot = np.zeros(num_time_slots) # Not needed if P_ict is source of truth

    for _, ev_row in sorted_evs_df.iterrows():
        ev_idx = ev_row['ev_idx']
        required_energy = ev_row['required_energy']
        min_energy_per_slot_ev = ev_row['min_energy_per_slot']
        max_energy_per_slot_ev = ev_row['max_energy_per_slot']
        arrival_slot = ev_row['arrival_slot_idx']
        departure_slot = ev_row['departure_slot_idx'] 

        energy_assigned_to_ev_total = 0 # Total energy assigned to this EV across all its slots

        for charger_idx in sorted_charger_indices:
            if ev_to_charger_map[ev_idx] != -1: break # EV already assigned by a previous charger iteration (should not happen with this loop structure)

            if compatibility_matrix[ev_idx, charger_idx] == 0:
                continue 

            charger_info = chargers_df.iloc[charger_idx]
            charger_capacity_per_slot = charger_info['energy_capacity_per_slot']
            
            # Check if this charger can accommodate this EV (is it free during EV's stay?)
            charger_is_available_for_ev = True
            for t_check in range(arrival_slot, departure_slot):
                if np.sum(P_ict[:, charger_idx, t_check]) > epsilon: # If any *other* EV is on this charger
                    charger_is_available_for_ev = False
                    break
            if not charger_is_available_for_ev:
                continue

            # Try to schedule this EV on this charger
            current_energy_for_ev_on_this_charger = 0
            temp_schedule_for_ev_on_charger = np.zeros(num_time_slots)


            price_slot_pairs = []
            for t_schedule in range(arrival_slot, departure_slot):
                 price_slot_pairs.append((energy_prices_at_t_idx[t_schedule], t_schedule))
            
            sorted_price_slots_for_ev = sorted(price_slot_pairs, key=lambda x: x[0])

            for price, t_idx in sorted_price_slots_for_ev:
                if current_energy_for_ev_on_this_charger >= required_energy - epsilon:
                    break 

                # Max energy we can assign in this slot
                energy_still_needed = required_energy - current_energy_for_ev_on_this_charger
                
                can_assign = energy_still_needed
                can_assign = min(can_assign, max_energy_per_slot_ev)
                can_assign = max(can_assign,min_energy_per_slot_ev)
                can_assign = min(can_assign, charger_capacity_per_slot - np.sum(P_ict[:, charger_idx, t_idx])) # Available on this charger
                can_assign = min(can_assign, transformer_energy_limit_per_slot - np.sum(P_ict[:, :, t_idx])) # Available on transformer
                can_assign = max(0, can_assign) # Ensure non-negative

                actual_assigned_this_slot = 0
                if can_assign >= min_energy_per_slot_ev - epsilon:
                    actual_assigned_this_slot = can_assign
                elif can_assign > epsilon and can_assign >= energy_still_needed - epsilon: # If less than min_rate, but it's the remainder
                    actual_assigned_this_slot = can_assign
                
                if actual_assigned_this_slot > epsilon:
                    temp_schedule_for_ev_on_charger[t_idx] = actual_assigned_this_slot
                    current_energy_for_ev_on_this_charger += actual_assigned_this_slot
            
            if current_energy_for_ev_on_this_charger > epsilon: # If any energy was scheduled for this EV on this charger
                # Commit this assignment to global P_ict
                P_ict[ev_idx, charger_idx, :] = temp_schedule_for_ev_on_charger[:]
                ev_to_charger_map[ev_idx] = charger_idx
                energy_assigned_to_ev_total = current_energy_for_ev_on_this_charger
                break # EV is assigned, move to next EV

    objective_val = calculate_objective(P_ict, processed_data)
    return {'P_ict': P_ict, 'ev_to_charger_map': ev_to_charger_map, 'objective': objective_val}


In [148]:

# --- Local Search Operators ---
def local_search_shift_energy(io_P_ict, io_ev_map, ev_idx, processed_data):
    """
    Tries to shift energy for a single EV (ev_idx) on its current charger
    from more expensive slots to cheaper slots to reduce cost.
    Modifies io_P_ict IN PLACE if improvement found.
    Returns True if an improvement was made, False otherwise.
    """
    ev_info = processed_data['arrivals_df'].iloc[ev_idx]
    charger_idx = io_ev_map[ev_idx]
    epsilon = 1e-6

    if charger_idx == -1: return False

    charger_info = processed_data['chargers_df'].iloc[charger_idx]
    energy_prices = processed_data['energy_prices_at_t_idx']
    
    ev_schedule_on_charger = [] 
    for t_idx in range(ev_info['arrival_slot_idx'], ev_info['departure_slot_idx']):
        if io_P_ict[ev_idx, charger_idx, t_idx] > epsilon:
            ev_schedule_on_charger.append({'t_idx': t_idx, 
                                           'energy': io_P_ict[ev_idx, charger_idx, t_idx], 
                                           'price': energy_prices[t_idx]})
    
    if not ev_schedule_on_charger: return False

    slots_to_shift_from = sorted(ev_schedule_on_charger, key=lambda x: x['price'], reverse=True)
    
    improved_overall = False
    for from_slot_data in slots_to_shift_from:
        from_t_idx = from_slot_data['t_idx']
        
        # Potential slots to shift to (must be cheaper and within EV's window)
        potential_to_slots = []
        for t_idx_to_check in range(ev_info['arrival_slot_idx'], ev_info['departure_slot_idx']):
            if t_idx_to_check == from_t_idx: continue
            if energy_prices[t_idx_to_check] < from_slot_data['price'] - epsilon:
                 potential_to_slots.append({'t_idx': t_idx_to_check, 
                                            'price': energy_prices[t_idx_to_check]})
        
        sorted_potential_to_slots = sorted(potential_to_slots, key=lambda x: x['price'])

        for to_slot_data in sorted_potential_to_slots:
            to_t_idx = to_slot_data['t_idx']
            
            # Energy available to move from 'from_t_idx' for this EV
            energy_at_from_slot_ev = io_P_ict[ev_idx, charger_idx, from_t_idx]
            if energy_at_from_slot_ev < epsilon: continue # Nothing left to move from this source

            # Capacity at 'to_t_idx'
            # 1. EV max rate
            capacity_to_slot_ev_max_rate = ev_info['max_energy_per_slot'] - io_P_ict[ev_idx, charger_idx, to_t_idx]
            # 2. Charger capacity
            charger_load_others_to_slot = np.sum(io_P_ict[:, charger_idx, to_t_idx]) - io_P_ict[ev_idx, charger_idx, to_t_idx]
            capacity_to_slot_charger = charger_info['energy_capacity_per_slot'] - charger_load_others_to_slot
            # 3. Transformer capacity
            transformer_load_others_to_slot = np.sum(io_P_ict[:, :, to_t_idx]) - io_P_ict[ev_idx, charger_idx, to_t_idx] # All other energy in this slot
            capacity_to_slot_transformer = processed_data['transformer_energy_limit_per_slot'] - transformer_load_others_to_slot
            
            max_can_receive_in_to_slot = min(capacity_to_slot_ev_max_rate, capacity_to_slot_charger, capacity_to_slot_transformer)
            max_can_receive_in_to_slot = max(0, max_can_receive_in_to_slot)

            amount_to_move = min(energy_at_from_slot_ev, max_can_receive_in_to_slot)

            if amount_to_move > epsilon:
                # Tentative move
                new_energy_at_from = io_P_ict[ev_idx, charger_idx, from_t_idx] - amount_to_move
                new_energy_at_to = io_P_ict[ev_idx, charger_idx, to_t_idx] + amount_to_move

                valid_move = True
                # Check min energy constraints if slots are still active
                if new_energy_at_from > epsilon and new_energy_at_from < ev_info['min_energy_per_slot'] - epsilon:
                    valid_move = False
                if new_energy_at_to < ev_info['min_energy_per_slot'] - epsilon: # Must meet min if now active
                    # Exception: if total required energy for EV is very small.
                    # This check might be too strict if the EV only needs a tiny bit more.
                    # For now, if it's charging, it should meet its min rate for that slot.
                    pass # Relaxing this for now, assuming initial heuristic or other checks handle total.

                if valid_move:
                    io_P_ict[ev_idx, charger_idx, from_t_idx] = new_energy_at_from
                    io_P_ict[ev_idx, charger_idx, to_t_idx] = new_energy_at_to
                    improved_overall = True
                    # Since energy moved from 'from_slot_data', its available energy changed.
                    # The outer loop will continue with the original list, but P_ict is updated.
                    # This is a greedy shift. More sophisticated might re-evaluate all slots.
    return improved_overall


In [149]:

def local_search(current_solution, processed_data, max_ls_iters=10):
    best_solution_ls = copy.deepcopy(current_solution) 
    
    for iter_ls in range(max_ls_iters):
        improved_in_this_pass = False
        
        ev_indices = list(range(processed_data['num_evs']))
        random.shuffle(ev_indices)

        for ev_idx in ev_indices:
            # Operator 1: Shift energy for this EV on its current charger
            if local_search_shift_energy(best_solution_ls['P_ict'], best_solution_ls['ev_to_charger_map'], ev_idx, processed_data):
                improved_in_this_pass = True
        
        if improved_in_this_pass:
            best_solution_ls['objective'] = calculate_objective(best_solution_ls['P_ict'], processed_data)
        else: # No improvement in a full pass over all EVs for this operator
            break 
            
    # Final check for feasibility of the LS output
    if not is_solution_feasible(best_solution_ls['P_ict'], best_solution_ls['ev_to_charger_map'], processed_data, verbose=False):
       # print("Warning: Local search resulted in an infeasible solution. Reverting to input of LS.")
       # return copy.deepcopy(current_solution) # Return original if LS broke feasibility
       # For ILS, it's often better to let the acceptance criteria handle potentially infeasible intermediates if LS can't fix them
       pass

    best_solution_ls['objective'] = calculate_objective(best_solution_ls['P_ict'], processed_data)
    return best_solution_ls

In [150]:

# --- Perturbation Operator ---
def perturb_solution(solution_to_perturb, processed_data, perturbation_strength=0.1):
    perturbed_solution = copy.deepcopy(solution_to_perturb)
    P_ict = perturbed_solution['P_ict']
    ev_to_charger_map = perturbed_solution['ev_to_charger_map'] 
    
    num_evs = processed_data['num_evs']
    arrivals_df = processed_data['arrivals_df']
    chargers_df = processed_data['chargers_df']
    compatibility_matrix = processed_data['compatibility_matrix']
    energy_prices_at_t_idx = processed_data['energy_prices_at_t_idx']
    transformer_energy_limit_per_slot = processed_data['transformer_energy_limit_per_slot']
    epsilon = 1e-6

    num_evs_to_perturb = max(1, int(num_evs * perturbation_strength)) 
    
    ev_indices_all = list(range(num_evs))
    random.shuffle(ev_indices_all)
    evs_to_perturb_indices = ev_indices_all[:num_evs_to_perturb]

    for ev_idx in evs_to_perturb_indices:
        assigned_charger = ev_to_charger_map[ev_idx]
        if assigned_charger != -1:
            P_ict[ev_idx, assigned_charger, :] = 0 
        ev_to_charger_map[ev_idx] = -1 

    # Re-insert EVs using a logic similar to initial heuristic but considering existing P_ict
    sorted_charger_indices = chargers_df.sort_values(by='power', ascending=False)['charger_idx'].tolist()

    for ev_idx in evs_to_perturb_indices: 
        ev_info = arrivals_df.iloc[ev_idx]
        required_energy = ev_info['required_energy']
        min_energy_per_slot_ev = ev_info['min_energy_per_slot']
        max_energy_per_slot_ev = ev_info['max_energy_per_slot']
        arrival_slot = ev_info['arrival_slot_idx']
        departure_slot = ev_info['departure_slot_idx']

        best_charger_found_for_reinsert = -1
        best_schedule_for_reinsert = np.zeros(processed_data['num_time_slots'])
        max_energy_reinserted = 0

        for charger_idx in sorted_charger_indices:
            if compatibility_matrix[ev_idx, charger_idx] == 0:
                continue

            charger_info = chargers_df.iloc[charger_idx]
            charger_capacity_per_slot = charger_info['energy_capacity_per_slot']
            
            current_try_schedule = np.zeros(processed_data['num_time_slots'])
            current_try_energy = 0

            price_slot_pairs = []
            for t_schedule in range(arrival_slot, departure_slot):
                 price_slot_pairs.append((energy_prices_at_t_idx[t_schedule], t_schedule))
            sorted_price_slots_for_ev = sorted(price_slot_pairs, key=lambda x: x[0])


            for price, t_idx in sorted_price_slots_for_ev:
                if current_try_energy >= required_energy - epsilon:
                    break

                energy_still_needed = required_energy - current_try_energy
                
                # Check existing load on this charger from *other* EVs (P_ict is updated from previous re-insertions)
                load_on_charger_others = np.sum(P_ict[:, charger_idx, t_idx]) # Assumes P_ict[ev_idx, charger_idx, t_idx] is 0 for this EV currently
                
                # Check existing load on transformer from *other* EVs/chargers
                load_on_transformer_others = np.sum(P_ict[:, :, t_idx])


                can_assign = energy_still_needed
                can_assign = min(can_assign, max_energy_per_slot_ev)
                can_assign = min(can_assign, charger_capacity_per_slot - load_on_charger_others) 
                can_assign = min(can_assign, transformer_energy_limit_per_slot - load_on_transformer_others)
                can_assign = max(0, can_assign)

                actual_assigned_this_slot = 0
                if can_assign >= min_energy_per_slot_ev - epsilon:
                    actual_assigned_this_slot = can_assign
                elif can_assign > epsilon and can_assign >= energy_still_needed - epsilon:
                    actual_assigned_this_slot = can_assign
                
                if actual_assigned_this_slot > epsilon:
                    current_try_schedule[t_idx] = actual_assigned_this_slot
                    current_try_energy += actual_assigned_this_slot
            
            if current_try_energy > max_energy_reinserted: # Prioritize more energy fulfillment
                max_energy_reinserted = current_try_energy
                best_charger_found_for_reinsert = charger_idx
                best_schedule_for_reinsert = current_try_schedule.copy()
            
            if max_energy_reinserted >= required_energy - epsilon:
                break # Fully satisfied this EV
        
        if best_charger_found_for_reinsert != -1 and max_energy_reinserted > epsilon:
            P_ict[ev_idx, best_charger_found_for_reinsert, :] = best_schedule_for_reinsert[:]
            ev_to_charger_map[ev_idx] = best_charger_found_for_reinsert
        # Else: EV might remain unassigned or partially assigned if no good spot found during re-insertion

    perturbed_solution['objective'] = calculate_objective(P_ict, processed_data)
    return perturbed_solution

In [151]:
def repair_solution(solution, processed_data):
    """
    Repairs a solution to satisfy all major constraints:
    1. One EV per charger per time slot.
    2. Transformer and Charger load limits.
    3. EV maximum charge rate per slot.
    4. EV minimum charge rate per slot (if charging).
    Modifies the solution's P_ict in place and returns the repaired solution dict.
    """
    P_ict = solution['P_ict']
    ev_map = solution['ev_to_charger_map'] # This map might be inconsistent after search, P_ict is truth
    
    arrivals_df = processed_data['arrivals_df']
    chargers_df = processed_data['chargers_df']
    transformer_limit_per_slot = processed_data['transformer_energy_limit_per_slot']
    epsilon = 1e-6

    num_evs, num_chargers, num_time_slots = P_ict.shape

    # --- Iterate through each time slot to fix load and rate issues ---
    for t_idx in range(num_time_slots):
        
        # --- Stage 1: Enforce Charger & EV Max Rate Constraints ---
        # These are hard physical limits, so we clamp them first.
        for c_idx in range(num_chargers):
            charger_limit = chargers_df.iloc[c_idx]['energy_capacity_per_slot']
            
            # Find all EVs trying to charge on this charger at this time
            evs_on_charger = [ev_idx for ev_idx in range(num_evs) if P_ict[ev_idx, c_idx, t_idx] > epsilon]
            
            # A) Enforce EV max rate for each EV individually
            for ev_idx in evs_on_charger:
                ev_max_rate = arrivals_df.iloc[ev_idx]['max_energy_per_slot']
                if P_ict[ev_idx, c_idx, t_idx] > ev_max_rate:
                    P_ict[ev_idx, c_idx, t_idx] = ev_max_rate # Clamp to EV's max rate
            
            # B) Enforce one-EV-per-charger and charger capacity
            # If more than one EV is on the charger, we must remove all but one.
            # A simple heuristic: keep the one with the highest required_energy.
            if len(evs_on_charger) > 1:
                ev_energies = {ev_idx: arrivals_df.iloc[ev_idx]['required_energy'] for ev_idx in evs_on_charger}
                survivor_ev_idx = max(ev_energies, key=ev_energies.get)
                for ev_idx in evs_on_charger:
                    if ev_idx != survivor_ev_idx:
                        P_ict[ev_idx, c_idx, t_idx] = 0 # Kick off other EVs

            # C) After clamping/kicking, check charger capacity again
            charger_load = np.sum(P_ict[:, c_idx, t_idx])
            if charger_load > charger_limit:
                 # This should only happen for the one surviving EV now. Clamp its energy.
                 # Find the survivor (should be only one non-zero)
                 for ev_idx in range(num_evs):
                     if P_ict[ev_idx, c_idx, t_idx] > epsilon:
                         P_ict[ev_idx, c_idx, t_idx] = charger_limit
                         break

        # --- Stage 2: Enforce Transformer Load Constraint ---
        # After charger and EV limits are fixed, fix the overall system limit.
        total_load = np.sum(P_ict[:, :, t_idx])
        if total_load > transformer_limit_per_slot:
            excess_load = total_load - transformer_limit_per_slot
            
            # Proportional reduction across all charging EVs in this slot
            contributors = []
            for ev_idx in range(num_evs):
                for c_idx in range(num_chargers):
                    if P_ict[ev_idx, c_idx, t_idx] > epsilon:
                        contributors.append((ev_idx, c_idx))
            
            if contributors: # Should always be true if total_load > 0
                for ev_idx, c_idx in contributors:
                    reduction = (P_ict[ev_idx, c_idx, t_idx] / total_load) * excess_load
                    P_ict[ev_idx, c_idx, t_idx] -= reduction

    # --- Stage 3: Final Cleanup - Enforce EV Minimum Rate ---
    # After all reductions, some charges might be invalidly small.
    for ev_idx in range(num_evs):
        min_energy = arrivals_df.iloc[ev_idx]['min_energy_per_slot']
        for c_idx in range(num_chargers):
            for t_idx in range(num_time_slots):
                energy = P_ict[ev_idx, c_idx, t_idx]
                # If energy is greater than zero but less than the minimum required, it's an invalid state.
                # So, we set it to zero as per the "make the energy 0 or a minimum value" strategy.
                if 0 < energy < min_energy - epsilon:
                    P_ict[ev_idx, c_idx, t_idx] = 0
    
    # After all repairs, the ev_to_charger_map might be out of date. Rebuild it from P_ict.
    solution['ev_to_charger_map'] = np.full(num_evs, -1, dtype=int)
    for ev_idx in range(num_evs):
        for c_idx in range(num_chargers):
            if np.sum(P_ict[ev_idx, c_idx, :]) > epsilon:
                solution['ev_to_charger_map'][ev_idx] = c_idx
                break # Found the assigned charger for this EV

    # Finally, recalculate the objective for the now-feasible solution
    solution['objective'] = calculate_objective(P_ict, processed_data)
    
    return solution

In [152]:

# --- Iterated Local Search Algorithm ---
def iterated_local_search(processed_data, max_iterations, max_no_improvement_ils,
                           initial_heuristic_type='pa_edf', 
                           max_ls_iters_per_call=10,
                           perturbation_strength=0.1):
    start_time_ils = time.time()

    print("Generating initial solution for ILS...")
    if initial_heuristic_type == 'pa_edf':
        current_solution = generate_initial_solution_pa_edf(processed_data)
    else: 
        print(f"Warning: Unknown initial_heuristic_type '{initial_heuristic_type}'. Using PA-EDF.")
        current_solution = generate_initial_solution_pa_edf(processed_data)

    if not is_solution_feasible(current_solution['P_ict'], current_solution['ev_to_charger_map'], processed_data, verbose=True):
         print("ERROR: Initial solution for ILS is infeasible. Aborting.")
         return None 
         
    print(f"Initial solution objective: {current_solution['objective']:.2f}")

    print("Applying local search to initial solution...")
    best_solution_overall = local_search(current_solution, processed_data, max_ls_iters=max_ls_iters_per_call)
    print(f"After initial local search, best objective: {best_solution_overall['objective']:.2f}")

    if not is_solution_feasible(best_solution_overall['P_ict'], best_solution_overall['ev_to_charger_map'], processed_data, verbose=True):
        print("ERROR: Solution after initial local search is infeasible. Using original initial solution as current best.")
        best_solution_overall = copy.deepcopy(current_solution) # Fallback

    current_best_in_ils_run = copy.deepcopy(best_solution_overall) 
    iterations_without_improvement_count = 0

    for i in range(max_iterations):
        print(f"\nILS Iteration {i+1}/{max_iterations}")
        
        print(f"Perturbing solution from obj: {current_best_in_ils_run['objective']:.2f}...")
        perturbed_s = perturb_solution(current_best_in_ils_run, processed_data, perturbation_strength)
        
        # It's good practice for perturbation to aim for feasibility, but LS should handle/fix minor issues.
        is_perturbed_feasible = is_solution_feasible(perturbed_s['P_ict'], perturbed_s['ev_to_charger_map'], processed_data, verbose=False)
        if not is_perturbed_feasible:
                # --- ADD REPAIR AFTER PERTURBATION ---
            print(f"Repairing perturbed solution (before repair obj: {perturbed_s['objective']:.2f})...")
            perturbed_s = repair_solution(perturbed_s, processed_data)
            print(f"Perturbed solution objective after repair: {perturbed_s['objective']:.2f}")
            is_perturbed_feasible=is_solution_feasible(perturbed_s['P_ict'], perturbed_s['ev_to_charger_map'], processed_data, verbose=False)
            print(f'Perturbed solution is now feasible? {is_perturbed_feasible}')
        
        print(f"Perturbed solution objective: {perturbed_s['objective']:.2f}")

        print("Applying local search to perturbed solution...")
        candidate_s = local_search(perturbed_s, processed_data, max_ls_iters=max_ls_iters_per_call)
        print(f"Candidate objective after LS: {candidate_s['objective']:.2f}")

        is_candidate_feasible = is_solution_feasible(candidate_s['P_ict'], candidate_s['ev_to_charger_map'], processed_data, verbose=False)

        if is_candidate_feasible==False:
                # --- ADD REPAIR AFTER Local Search ---
            print(f"Repairing candidate solution (before repair obj: {candidate_s['objective']:.2f})...")
            candidate_s = repair_solution(candidate_s, processed_data)
            print(f"Candidate solution objective after repair: {candidate_s['objective']:.2f}")
            if perturbed_s['objective']<candidate_s['objective']:
                candidate_s=perturbed_s


        is_candidate_feasible = is_solution_feasible(candidate_s['P_ict'], candidate_s['ev_to_charger_map'], processed_data, verbose=False)
        
        if is_candidate_feasible and candidate_s['objective'] < current_best_in_ils_run['objective']:
            current_best_in_ils_run = copy.deepcopy(candidate_s)
            print(f"ILS: New current best in run: {current_best_in_ils_run['objective']:.2f}")
            iterations_without_improvement_count = 0 
            
            if current_best_in_ils_run['objective'] < best_solution_overall['objective']:
                best_solution_overall = copy.deepcopy(current_best_in_ils_run)
                print(f"ILS: New GLOBAL BEST solution: {best_solution_overall['objective']:.2f}")
        else:
            if not is_candidate_feasible:
                print("ILS: Candidate from LS was infeasible. Not accepting.")
            # else: Candidate was feasible but not better than current_best_in_ils_run
            iterations_without_improvement_count += 1
            print(f"ILS: Iterations without improvement in current run: {iterations_without_improvement_count}")

        if iterations_without_improvement_count >= max_no_improvement_ils:
            print(f"ILS: Stopping early due to {max_no_improvement_ils} iterations without improvement in current run.")
            break
            
    end_time_ils = time.time()
    print(f"\nILS finished in {end_time_ils - start_time_ils:.2f} seconds.")
    print(f"Final best objective found by ILS: {best_solution_overall['objective']:.2f}")
    
    if not is_solution_feasible(best_solution_overall['P_ict'], best_solution_overall['ev_to_charger_map'], processed_data, verbose=True):
        print("CRITICAL WARNING: The final best solution from ILS is INFEASIBLE!")
    else:
        print("Final best solution from ILS is FEASIBLE.")

    return best_solution_overall


In [157]:

# --- Main Execution Example ---
if __name__ == '__main__':
    try:
        # Ensure 'test_system_1.json' is in the same directory as your script,
        # or provide the full path to the file.
        file_path = 'test_system_1.json' 
        processed_data = load_and_preprocess_data(file_path)
        print("Data loaded and preprocessed successfully.")
        print(f"Number of EVs: {processed_data['num_evs']}")
        print(f"Number of Chargers: {processed_data['num_chargers']}")
        print(f"Number of Time Slots: {processed_data['num_time_slots']}")

    except FileNotFoundError:
        print(f"Error: The file {file_path} was not found. Please check the path.")
        exit()
    except Exception as e:
        print(f"An error occurred during data loading or initial processing: {e}")
        import traceback
        traceback.print_exc()
        exit()

    # ILS Parameters (you'll need to tune these)
    MAX_ILS_ITERATIONS = 5     # Total number of ILS iterations
    MAX_NO_IMPROVEMENT_ILS = 5    # Stop if no improvement for this many ILS iterations in a row
    MAX_LS_ITERS_PER_CALL = 5     # Max passes for each call to local_search
    PERTURBATION_STRENGTH = 0.20  # Fraction of EVs to perturb (e.g., 20%)
    
    print("\nStarting Iterated Local Search...")
    final_solution = iterated_local_search(
        processed_data,
        max_iterations=MAX_ILS_ITERATIONS,
        max_no_improvement_ils=MAX_NO_IMPROVEMENT_ILS,
        initial_heuristic_type='pa_edf', # Can be 'pa_edf' or adapted 'sa_fcfs'
        max_ls_iters_per_call=MAX_LS_ITERS_PER_CALL,
        perturbation_strength=PERTURBATION_STRENGTH
    )

    if final_solution:
        print("\n--- Final ILS Solution Summary ---")
        print(f"Best Objective Value: {final_solution['objective']:.2f}")
        boolean_s=is_solution_feasible(final_solution['P_ict'], final_solution['ev_to_charger_map'], processed_data, verbose=False)

        print(f'la solución es factible?: {boolean_s}')
        total_energy_supplied_final = np.sum(final_solution['P_ict'])
        print(f"Total Energy Supplied in Final Solution: {total_energy_supplied_final:.2f} kWh")

        print(final_solution['P_ict'].shape)


  merged_prices_df['price'] = merged_prices_df['price'].fillna(method='ffill').fillna(method='bfill').fillna(0)


Data loaded and preprocessed successfully.
Number of EVs: 103
Number of Chargers: 10
Number of Time Slots: 281

Starting Iterated Local Search...
Generating initial solution for ILS...
Initial solution objective: 1311161.76
Applying local search to initial solution...
After initial local search, best objective: 1305659.50
Feasibility Error: Transformer limit exceeded at t_idx 7. Load: 8.50, Limit: 3.50
ERROR: Solution after initial local search is infeasible. Using original initial solution as current best.

ILS Iteration 1/5
Perturbing solution from obj: 1311161.76...
Repairing perturbed solution (before repair obj: 1289935.03)...
Perturbed solution objective after repair: 1290970.26
Perturbed solution is now feasible? True
Perturbed solution objective: 1290970.26
Applying local search to perturbed solution...
Candidate objective after LS: 1285245.01
Repairing candidate solution (before repair obj: 1285245.01)...
Candidate solution objective after repair: 1547531.96
ILS: New current b