# Added the correct RK calculation and added two categories for the response time calculation

In [1]:
import gurobipy as gp
import pandas as pd
from code_map import final_markets, new_meters, Inputs
import numpy as np
import calendar 
from datetime import datetime
import pytz
import openpyxl
import os
import pickle


In [None]:
#fcr_d_1_df_2022 = pd.read_excel("/Users/sandermeland/Documents/Jobb/Volte/master-kode/markets/markets-data/new_fcrd1.xlsx")
#fcr_d_2_df_2022 = pd.read_excel("/Users/sandermeland/Documents/Jobb/Volte/master-kode/markets/markets-data/new_fcrd2.xlsx")
#FCR DIRECTORIES
fcr_d_1_directory = "../master-data/markets-data/FCR_D-1-2023.xlsx"
fcr_d_2_directory = "../master-data/markets-data/FCR_D-2-2023.xlsx"

# aFRR DIRECTORIES
afrr_up_directory = '../master-data/markets-data/aFFR/up_2023'
afrr_down_directory = '../master-data/markets-data/aFFR/down_2023'

# RK DIRECTORIES
rk_price_down_path = "../master-data/markets-data/RK/new_rk_price_down.csv"
rk_price_up_path = "../master-data/markets-data/RK/new_rk_price_up.csv"
rk_volume_up_path = "../master-data/markets-data/RK/new_rk_vol_up.csv"
rk_volume_down_path = "../master-data/markets-data/RK/new_rk_vol_down.csv"

# RKOM DIRECTORIES
rkom_2022_path = "../master-data/markets-data/RKOM.xlsx"
rkom_2023_path = "../master-data/markets-data/Rkom-2023.xlsx"

#SPOT PRICE DIRECTORY
spot_path = "../master-data/spot_data/spot_june_23.csv"

# CATEGORY DIRECTORIES
cat_path_list = ["../master-data/categorization_data/harktech_meters.csv",  "../master-data/categorization_data/ev_meters.csv"]


In [None]:
timeframe = Inputs.one_day
consumption_data =pd.read_csv('../master-data/customers-data/added_type_and_comp.csv')


In [None]:
all_market_list = final_markets.get_market_list(tf = timeframe, spot_path=spot_path, fcr_d_1_path= fcr_d_1_directory, fcr_d_2_path=fcr_d_2_directory, afrr_up_directory=afrr_up_directory, afrr_down_directory=afrr_down_directory, rk_price_down_path=rk_price_down_path,rk_price_up_path= rk_price_up_path, rk_volume_up_path=rk_volume_up_path, rk_volume_down_path=rk_volume_down_path, rkom_22_path=rkom_2022_path, rkom_23_path= rkom_2023_path)
power_meter_dict = new_meters.create_meter_objects(consumption_data = consumption_data, tf= timeframe, reference_tf= Inputs.one_month, category_path_list=cat_path_list) 
freq_data = Inputs.get_frequency_data(timeframe, '../master-data/frequency_data/2023-06')
afrr_activation_up = Inputs.get_afrr_activation_data(tf = timeframe, afrr_directory = '../master-data/aFRR_activation/', direction = "Up")
afrr_activation_down = Inputs.get_afrr_activation_data(tf = timeframe, afrr_directory = '../master-data/aFRR_activation/', direction = "Down")

In [None]:
H = Inputs.get_timestamps(timeframe)

# Define the sets
L = list(power_meter_dict.values())  # List of PowerMeter objects
M = all_market_list  # List of ReserveMarket objects

F = Inputs.get_FCR_N_percentages(freq_data, H, M)

markets_dict = {market.name: market for market in M}

# make a list of only the meters that have direction up or both
L_u = [meter for meter in L if meter.direction != 'down']
L_d = [meter for meter in L if meter.direction != 'up']

In [None]:
print([market.name for market in M])

In [None]:
print(f"Amount of markets : {len(M)}")
print(f"Amount of meters : {len(L)}")
print(f"Amount of meters with direction up or both : {len(L_u)}")
print(f"Amount of meters with direction down or both : {len(L_d)}")
print(f"Amount of hours : {len(H)}")

In [None]:
Fu_h_l = np.array([[load.up_flex_volume["value"].loc[load.up_flex_volume["Time(Local)"] == hour].values[0] if load.direction != "down" else 0 for load in L] for hour in H]) # set of flex volumes for meters, if load.direction != "down"
Fd_h_l = np.array([[load.down_flex_volume["value"].loc[load.down_flex_volume["Time(Local)"] == hour].values[0] if load.direction != "up" else 0 for load in L] for hour in H]) # set of flex volumes for meters, if load.direction != "up"

R_h_l = np.array([[load.response_time for load in L]] * len(H)) # set of response times for meters

P_h_m = np.array([[market.price_data.loc[market.price_data["Time(Local)"] == hour].values[0][1] for market in M] for hour in H]) # set of prices for markets
Vp_h_m = np.array([[market.volume_data.loc[market.volume_data["Time(Local)"] == hour].values[0][1] for market in M] for hour in H]) # set of volumes for markets

Vm_m = [market.min_volume for market in M] # set of min values for markets
R_m = [market.response_time for market in M] # set of response times for markets


In [None]:
total_up_flex = np.sum(Fu_h_l) # total available flex volume up
total_down_flex = np.sum(Fd_h_l) # total available flex volume down
total_response_time = np.sum(R_h_l) # total response time
#total_flex = total_up_flex + total_down_flex
average_response_time = total_response_time/ (len(H)*len(L))
hourly_flex_up = total_up_flex/len(H)
hourly_flex_down = total_down_flex/len(H)

print(f"Total up flex volume: {total_up_flex} MW")
print(f"Total down flex volume: {total_down_flex} MW")
print(f"Average flex volume pr hour up: {hourly_flex_up} MWh")
print(f"Average flex volume pr hour down: {hourly_flex_down} MWh")
print(f"Average response time: {average_response_time} seconds")

In [None]:
def get_dominant_direction(freq_df : pd.DataFrame, hour : pd.Timestamp):
    """will find out which direction is dominant within an hour

    Args:
        freq_data (pd.DataFrame): dataframe of the frequency data
        hour (pd.Timestamp): the wanted hour
    """
    start_datetime = hour 
    end_datetime = hour + pd.Timedelta(hours=1)
        
    filtered_df = freq_df[(freq_df["Time"] >= start_datetime) & (freq_df["Time"] <= end_datetime)]
    #print(filtered_df)
    avg_freq = filtered_df["Value"].mean()
    #print(avg_freq)
    if avg_freq > 50.0:
        return "up"
    else:
        return "down"

In [None]:
dominant_directions = [get_dominant_direction(freq_data, hour) for hour in H]

In [None]:
"""
Ir_hlm = {} # reservation income for load l in market m at hour h
Ia_hlm = {} # activation income for load l in market m at hour h
Va_hm = {} # activation volume for load l in market m at hour h

for h, hour in enumerate(H):
    for m, market in enumerate(M):
        for l, load in enumerate(L):
            if market.direction == "both":
                if load.direction == "both":
                    if dominant_directions[h] == "up":
                        Ir_hlm[h,l,m] = Fu_h_l[h,l] * P_h_m[h,m]
                    else:
                        Ir_hlm[h,l ,m] = Fd_h_l[h,l] * P_h_m[h,m]
                    #I[h,l,m] =(Fu_h_l[h,l]+ Fd_h_l[h,l])/2 * P_h_m[h,m]
                else:
                    Ir_hlm[h,l,m] = 0
            elif market.direction == "up":
                if load.direction != "down":
                    Ir_hlm[h,l,m] = Fu_h_l[h,l] * P_h_m[h,m]
                else:
                    Ir_hlm[h,l,m] = 0
            else: # market.direction == "down"
                if load.direction != "up":
                    Ir_hlm[h,l,m] = Fd_h_l[h,l] * P_h_m[h,m]
                else:
                    Ir_hlm[h,l,m] = 0
            if market.capacity_market: 
                if "FCR_N" in market.name:
                    up_val, down_val = F[h,m]
                    Va_hm[h,m] = Vp_h_m[h,m] * (up_val + down_val) if (up_val + down_val) > 0 else 0
                    if load.direction == "both":
                        activation_income = (Fu_h_l[h,l] * up_val * markets_dict["RK_up_" + market.area].price_data.loc[markets_dict["RK_up_" + market.area].price_data["Time(Local)"] == hour].values[0][1] + 
                                            Fd_h_l[h,l] * down_val * markets_dict["RK_down_" + market.area].price_data.loc[markets_dict["RK_down_" + market.area].price_data["Time(Local)"] == hour].values[0][1])
                        # Add to the objective expression
                        Ia_hlm[h,l,m] = activation_income
                    else:
                        Ia_hlm[h,l,m] = 0
                elif "aFRR" in market.name: # will have to add the other markets later - especially aFRR and RKOM
                    if market.direction == "up":
                        col_name = [col for col in afrr_activation_up.columns if market.area in col][0]
                        activated_volume = afrr_activation_up[col_name].loc[afrr_activation_up["Time"] == hour].values[0] 
                        Va_hm[h,m] = activated_volume
                        if  load.direction != "down" and activated_volume > 0:
                            Ia_hlm[h,l,m] = Fu_h_l[h,l] * markets_dict["RK_up_" + market.area].price_data.loc[markets_dict["RK_up_" + market.area].price_data["Time(Local)"] == hour].values[0][1]
                        else:
                            Ia_hlm[h,l,m] = 0
                    elif market.direction == "down": 
                        col_name = [col for col in afrr_activation_down.columns if market.area in col][0]
                        activated_volume = afrr_activation_down[col_name].loc[afrr_activation_down["Time"] == hour].values[0]
                        Va_hm[h,m] = activated_volume
                        if load.direction != "up" and activated_volume > 0:
                            Ia_hlm[h,l,m] = Fd_h_l[h,l] * markets_dict["RK_down_" + market.area].price_data.loc[markets_dict["RK_down_" + market.area].price_data["Time(Local)"] == hour].values[0][1]
                        else:
                            Ia_hlm[h,l,m] = 0
                    else:
                        Ia_hlm[h,l,m] = 0
                    
                else: # No activation income, just regular income
                    Ia_hlm[h,l,m] = 0
                    Va_hm[h,m] = 0
            else:
                # No capacity market, just regular income
                Ir_hlm[h,l,m] = P_h_m[h,m] * Vp_h_m[h,m]
                Ia_hlm[h,l,m] = 0
                Va_hm[h,m] = 0
            
                """
                
# i may need to add a constraint to secure that the total activated volume in a market is less than the total volume in that market


In [None]:

"""def get_income_dictionaries(H, M, L, dominant_directions, Fu_h_l, Fd_h_l, P_h_m, Vp_h_m, F, markets_dict, afrr_activation_up, afrr_activation_down):
    Ir_hlm = {} # reservation income
    Ia_hlm = {} # activation income
    Va_hm = {} # activation volume

    # Precompute values that can be determined outside the inner loop
    RK_up_prices = {}
    RK_down_prices = {}
    for area in ['NO1', 'NO2', 'NO3', 'NO4', 'NO5']:
        for hour in H:
            RK_up_prices[(area, hour)] = markets_dict["RK_up_" + area].price_data.loc[markets_dict["RK_up_" + area].price_data["Time(Local)"] == hour].values[0][1]
            RK_down_prices[(area, hour)] =  markets_dict["RK_down_" + area].price_data.loc[markets_dict["RK_down_" + area].price_data["Time(Local)"] == hour].values[0][1]

    for h, hour in enumerate(H):
        for m, market in enumerate(M):
            up_val, down_val = F[h,m]
            activation_multiplier = Vp_h_m[h,m] * (up_val + down_val) if (up_val + down_val) > 0 else 0
            is_fcr_n = "FCR_N" in market.name
            is_afrr = "aFRR" in market.name

            for l, load in enumerate(L):
                direction_match = (market.direction == "both" and load.direction == "both") or \
                                  (market.direction == "up" and load.direction != "down") or \
                                  (market.direction == "down" and load.direction != "up")

                if direction_match:
                    flex_vol = Fu_h_l[h,l] if (market.direction == "up" or (market.direction == "both" and dominant_directions[h] == "up")) else Fd_h_l[h,l]
                    Ir_hlm[h,l,m] = flex_vol * P_h_m[h,m]
                else:
                    Ir_hlm[h,l,m] = 0

                if market.capacity_market:
                    if is_fcr_n and load.direction == "both":
                        up_price = RK_up_prices[(market.area, hour)] 
                        down_price = RK_down_prices[(market.area, hour)] 
                        Ia_hlm[h,l,m] = (Fu_h_l[h,l] * up_val * up_price) + (Fd_h_l[h,l] * down_val * down_price)
                    elif is_afrr:
                        col_name = [col for col in afrr_activation_up.columns if market.area in col][0]
                        activated_volume = afrr_activation_up[col_name].loc[afrr_activation_up["Time"] == hour].values[0] 
                        Va_hm[h,m] = activated_volume
                        price = RK_up_prices[(market.area, hour)] if market.direction == "up" else RK_down_prices[(market.area, hour)]
                        Ia_hlm[h,l,m] = flex_vol * price if activated_volume > 0 else 0
                    else:
                        Ia_hlm[h,l,m] = 0
                        Va_hm[h,m] = 0
                else:
                    Ia_hlm[h,l,m] = 0
                    Va_hm[h,m] = 0
    return Ir_hlm, Ia_hlm, Va_hm
"""


In [None]:

def get_income_dictionaries_1(H, M, L, dominant_directions, Fu_h_l, Fd_h_l, P_h_m, Vp_h_m, F, markets_dict, afrr_activation_up, afrr_activation_down):
    Ir_hlm = {} # reservation income
    Ia_hlm = {} # activation income
    Va_hm = {} # activation volume

    # Precompute values that can be determined outside the inner loop
    RK_up_prices = {}
    RK_down_prices = {}
    aFRR_activation_up_volume = {}
    aFRR_activation_down_volume = {}
    for area in ['NO1', 'NO2', 'NO3', 'NO4', 'NO5']:
        for hour in H:
            RK_up_prices[(area, hour)] = markets_dict["RK_up_" + area].price_data.loc[markets_dict["RK_up_" + area].price_data["Time(Local)"] == hour].values[0][1]
            RK_down_prices[(area, hour)] =  markets_dict["RK_down_" + area].price_data.loc[markets_dict["RK_down_" + area].price_data["Time(Local)"] == hour].values[0][1]
            col_name_up = [col for col in afrr_activation_up.columns if area in col][0]
            aFRR_activation_up_volume[(area, hour)] = afrr_activation_up[col_name_up].loc[afrr_activation_up["Time"] == hour].values[0] 
            col_name_down = [col for col in afrr_activation_down.columns if area in col][0]
            aFRR_activation_down_volume[(area, hour)] = afrr_activation_down[col_name_down].loc[afrr_activation_down["Time"] == hour].values[0]
    
    for h, hour in enumerate(H):
        for m, market in enumerate(M):
            for l, load in enumerate(L):
                if market.direction == "both":
                    if load.direction == "both":
                        if dominant_directions[h] == "up":
                            Ir_hlm[h,l,m] = Fu_h_l[h,l] * P_h_m[h,m]
                        else:
                            Ir_hlm[h,l ,m] = Fd_h_l[h,l] * P_h_m[h,m]
                        #I[h,l,m] =(Fu_h_l[h,l]+ Fd_h_l[h,l])/2 * P_h_m[h,m]
                    else:
                        Ir_hlm[h,l,m] = 0
                elif market.direction == "up":
                    if load.direction != "down":
                        Ir_hlm[h,l,m] = Fu_h_l[h,l] * P_h_m[h,m]
                    else:
                        Ir_hlm[h,l,m] = 0
                else: # market.direction == "down"
                    if load.direction != "up":
                        Ir_hlm[h,l,m] = Fd_h_l[h,l] * P_h_m[h,m]
                    else:
                        Ir_hlm[h,l,m] = 0
                if market.capacity_market: 
                    if "FCR_N" in market.name:
                        up_val, down_val = F[h,m]
                        Va_hm[h,m] = Vp_h_m[h,m] * (up_val + down_val) if (up_val + down_val) > 0 else 0
                        if load.direction == "both":
                            activation_income = (Fu_h_l[h,l] * up_val * RK_up_prices[(market.area, hour)] + 
                                                Fd_h_l[h,l] * down_val * RK_down_prices[(market.area, hour)])
                            # Add to the objective expression
                            Ia_hlm[h,l,m] = activation_income
                        else:
                            Ia_hlm[h,l,m] = 0
                    elif "aFRR" in market.name: # will have to add the other markets later - especially aFRR and RKOM
                        if market.direction == "up":
                            activated_volume = aFRR_activation_up_volume[(market.area, hour)]
                            Va_hm[h,m] = activated_volume
                            if  load.direction != "down" and activated_volume > 0:
                                Ia_hlm[h,l,m] = Fu_h_l[h,l] * RK_up_prices[(market.area, hour)]
                            else:
                                Ia_hlm[h,l,m] = 0
                        elif market.direction == "down": 
                            activated_volume = aFRR_activation_down_volume[(market.area, hour)]
                            Va_hm[h,m] = activated_volume
                            if load.direction != "up" and activated_volume > 0:
                                Ia_hlm[h,l,m] = Fd_h_l[h,l] * RK_down_prices[(market.area, hour)]
                            else:
                                Ia_hlm[h,l,m] = 0
                        else:
                            Ia_hlm[h,l,m] = 0
                        
                    else: # No activation income, just regular income
                        Ia_hlm[h,l,m] = 0
                        Va_hm[h,m] = 0
                else:
                    # No capacity market, just regular income
                    Ir_hlm[h,l,m] = P_h_m[h,m] * Vp_h_m[h,m]
                    Ia_hlm[h,l,m] = 0
                    Va_hm[h,m] = 0
    return Ir_hlm, Ia_hlm, Va_hm



In [None]:
Ir_hlm, Ia_hlm, Va_hm = get_income_dictionaries_1(H, M, L, dominant_directions, Fu_h_l, Fd_h_l, P_h_m, Vp_h_m, F, markets_dict, afrr_activation_up, afrr_activation_down)


In [None]:
compatible_list = []
for h, hour in enumerate(H):
    hour_list = []
    for l, asset in enumerate(L):
        asset_list = []
        for m, market in enumerate(M):
            if asset.direction == "up":
                if market.direction == "up":
                    if market.area == asset.area or market.area == "all":
                        asset_list.append(m)
            elif asset.direction == "down":
                if market.area == asset.area or market.area == "all":
                    if market.direction == "down":
                        asset_list.append(m)
                
            elif asset.direction == "both":
                if market.area == asset.area  or market.area == "all":
                    asset_list.append(m)
        hour_list.append(asset_list)
    compatible_list.append(hour_list)



In [None]:
# Create a new model
test_model = gp.Model("AssetToMarket")

# Create decision variables
x = {}
d = {}
y = {}
w = {}
for h in range(len(H)):
    for l in range(len(L)):
        for m in range(len(M)):
            # asset i is connected to market j at hour h
            x[h, l, m] = test_model.addVar(lb = 0, ub = 1, vtype=gp.GRB.BINARY, name=f"x_{h}_{l}_{m}")

            d[h,l,m] = 1 if m in compatible_list[h][l] else 0 # compatible_list takes care of both the area constraint and the direction constraint
            
            # adding the constraint
            test_model.addConstr(x[h,l,m] <= d[h,l,m])
    for m in range(len(M)):
        # market m has a bid at hour h
        y[h, m] = test_model.addVar(lb = 0, ub = 1, vtype=gp.GRB.BINARY, name=f"y_{h}_{m}")
        # market m is activated at hour h
        w[h, m] = test_model.addVar(lb = 0, ub = 1, vtype=gp.GRB.BINARY , name=f"w_{h}_{m}")
        

# Set objective            

# Set the objective to maximize the total income expression
test_model.setObjective(sum(Ir_hlm[h,l,m] * x[h,l,m] + Ia_hlm[h,l,m] * x[h,l,m] * w[h,m] for h in range(len(H)) for l in range(len(L)) for m in range(len(M)))
                        , gp.GRB.MAXIMIZE) # can possibly remove the x on the activation income

# Add constraints
for h in range(len(H)):
    for l in range(len(L)):
        # Each asset can only be connected to one market at a time
        test_model.addConstr(sum(x[h, l, m] for m in range(len(M))) <= 1, f"single_market_for_asset_at_hour_{h}_nr.{l}")
        

    for m, market in enumerate(M):
        
        up_val, down_val = F[h,m]
        if up_val + down_val > 0:
            test_model.addConstr(w[h,m] <= y[h,m], f"market_{m}_can_not_be_activated_at_hour_{h}_if_it_is_not_active")
        else:
            test_model.addConstr(w[h,m] == 0, f"market_{m}_can_not_be_activated_at_hour_{h}_if_it_is_not_active")
        
        
        # Connect the binary variables by using big M
        #test_model.addConstr(sum(x[h, l, m] for l in range(len(L))) <= len(L) * y[h, m], f"asset_connection_for_hour_{h}_market_{m}")
     
        # Max volume constraint
        
        if market.direction == "up":
            # capacity volume constraint
            test_model.addConstr(sum(x[h, l, m] * Fu_h_l[h,l] for l in range(len(L))) <= Vp_h_m[h,m]  * y[h,m], f"max_volume_for_hour_{h}_market_{m}")
            # activation volume constraint
            test_model.addConstr(sum(x[h, l, m] * Fu_h_l[h,l] for l in range(len(L))) * w[h,m] <= Va_hm[h,m], f"max_volume_for_activation_in-_market_{m}_at_hour_{h}")
            # min volume capacity constraint
            test_model.addConstr(sum(x[h, l, m] * Fu_h_l[h,l] for l in range(len(L))) >= Vm_m[m] * y[h, m], f"min_volume_for_hour_{h}_market_{m}") 

        elif market.direction == "down":
            # max capacity volume constraint
            test_model.addConstr(sum(x[h, l, m] * Fd_h_l[h,l] for l in range(len(L))) <=  Vp_h_m[h,m]  * y[h,m], f"max_volume_for_hour_{h}_market_{m}")
            # max activation volume constraint
            test_model.addConstr(sum(x[h, l, m] * Fd_h_l[h,l] for l in range(len(L))) * w[h,m] <= Va_hm[h,m], f"max_volume_for_activation_in_market_{m}_at_hour_{h}")
            # min volume capacity constraint
            test_model.addConstr(sum(x[h, l, m] * Fd_h_l[h,l] for l in range(len(L))) >= Vm_m[m] * y[h, m], f"min_volume_for_hour_{h}_market_{m}") 

        else: # market.direction == "both"
            if dominant_directions[h] == "up":
                # max capacity volume constraint
                test_model.addConstr(sum(x[h, l, m] * Fu_h_l[h,l] for l in range(len(L))) <= Vp_h_m[h,m]  * y[h,m], f"max_volume_for_hour_{h}_market_{m}")
                # max activation volume constraint
                test_model.addConstr(sum(x[h, l, m] * Fu_h_l[h,l] for l in range(len(L))) * w[h,m] <= Va_hm[h,m] , f"max_volume_for_activation_in_market_{m}_at_hour_{h}")
                # min capacity volume constraint
                test_model.addConstr(sum(x[h, l, m] * Fu_h_l[h,l] for l in range(len(L))) >= Vm_m[m] * y[h, m], f"min_volume_for_hour_{h}_market_{m}") 

            else:
                # max capacity volume constraint
                test_model.addConstr(sum(x[h, l, m] * Fd_h_l[h,l] for l in range(len(L))) <= Vp_h_m[h,m] * y[h,m], f"max_volume_for_hour_{h}_market_{m}")
                # max activation volume constraint
                test_model.addConstr(sum(x[h, l, m] * Fd_h_l[h,l] for l in range(len(L))) * w[h,m] <= Va_hm[h,m], f"max_volume_for_activation_in_market_{m}_at_hour_{h}")
                # min capacity volume constraint
                test_model.addConstr(sum(x[h, l, m] * Fd_h_l[h,l] for l in range(len(L))) >= Vm_m[m] * y[h, m], f"min_volume_for_hour_{h}_market_{m}") 

    
        
        # The response times for loads l connected to market m cannot exceed the max response time for m
        for l in range(len(L)):
            test_model.addConstr(x[h,l,m] * R_h_l[h,l] <= R_m[m] * y[h,m], f"response_time_for_hour_{h}_market_{m}")
        
            

# Enable logging
test_model.setParam('LogFile', 'test_model.log')

# Solve the model
test_model.optimize()
    
if test_model.status == gp.GRB.Status.INFEASIBLE:
    test_model.computeIIS()


In [None]:
def test_solution_validity(x, y, L, M, H):
    for h, hour in enumerate(H):
        for l, load in enumerate(L):
            # Each asset can only be connected to one market at a time
            assert sum(x[h, l, m].X for m in range(len(M))) <= 1, f"Asset {l} connected to multiple markets at hour {h}"
            for m, market in enumerate(M):
                # Directionality constraints
                if load.direction == "up" and market.direction == "down":
                    assert x[h, l, m].X == 0, f"Up-direction asset {l} connected to down-direction market {m} at hour {h}"
                elif load.direction == "down" and market.direction == "up":
                    assert x[h, l, m].X == 0, f"Down-direction asset {l} connected to up-direction market {m} at hour {h}"
                elif market.direction == "both" and load.direction != "both":
                    assert x[h, l, m].X == 0, f"Asset {l} with specific direction connected to both-direction market {m} at hour {h}"
                elif market.area != load.area:
                    assert x[h, l, m].X == 0, f"Asset {l} in area {load.area} connected to market {m} in area {market.area} at hour {h}"
                
                # Response time constraints
                assert x[h, l, m].X * load.response_time <= market.response_time * y[h, m].X, f"Asset {l} connected to market {m} at hour {h} violates response time constraint"
                
        for m, market in enumerate(M):
            # Connect the binary variables by using big M
            assert sum(x[h, l, m].X for l in range(len(L))) <= len(L) * y[h, m].X, f"More than allowed assets connected at hour {h} to market {m}"

            #total_flex_volume = sum(x[h, l, m].X * load.flex_volume["value"].loc[load.flex_volume["Time(Local)"] == hour].values[0] for l, load in enumerate(L))

            # Min volume constraint
            if market.direction == "up":
                total_flex_volume = sum(x[h, l, m].X * load.up_flex_volume["value"].loc[load.up_flex_volume["Time(Local)"] == hour].values[0] for l, load in enumerate(L) if load.direction != "down")
            elif market.direction == "down":
                total_flex_volume = sum(x[h, l, m].X * load.down_flex_volume["value"].loc[load.down_flex_volume["Time(Local)"] == hour].values[0] for l, load in enumerate(L) if load.direction != "up")
            else: # direction = "both"
                if dominant_directions[h] == "up":
                    total_flex_volume = sum(x[h, l, m].X * load.up_flex_volume["value"].loc[load.up_flex_volume["Time(Local)"] == hour].values[0] for l, load in enumerate(L) if load.direction != "down")
                else:
                    total_flex_volume = sum(x[h, l, m].X * load.down_flex_volume["value"].loc[load.down_flex_volume["Time(Local)"] == hour].values[0] for l, load in enumerate(L) if load.direction != "up")
            
            assert total_flex_volume >= market.min_volume * y[h, m].X, f"Minimum volume constraint violated at hour {h} for market {m}"
            
            # Max volume constraint for both capacity and activation
            if market.direction == "up":
                total_max_volume = sum(x[h, l, m].X * load.up_flex_volume["value"].loc[load.up_flex_volume["Time(Local)"] == hour].values[0] for l, load in enumerate(L) if load.direction != "down")

            elif market.direction == "down":
                total_max_volume = sum(x[h, l, m].X * load.down_flex_volume["value"].loc[load.down_flex_volume["Time(Local)"] == hour].values[0] for l, load in enumerate(L) if load.direction != "up")

            else:
                """if dominant_directions[h] == "up":
                    total_max_volume = sum(x[h, l, m].X * load.up_flex_volume["value"].loc[load.up_flex_volume["Time(Local)"] == hour].values[0] for l, load in enumerate(L) if load.direction != "down")
                else:
                    total_max_volume = sum(x[h, l, m].X * load.down_flex_volume["value"].loc[load.down_flex_volume["Time(Local)"] == hour].values[0] for l, load in enumerate(L) if load.direction != "up")"""
                total_up_max_volume = sum(x[h, l, m].X * load.up_flex_volume["value"].loc[load.up_flex_volume["Time(Local)"] == hour].values[0] for l, load in enumerate(L) if load.direction != "down")
                total_down_max_volume = sum(x[h, l, m].X * load.down_flex_volume["value"].loc[load.down_flex_volume["Time(Local)"] == hour].values[0] for l, load in enumerate(L) if load.direction != "up")
                up_frac, down_frac = F[h,m]
                total_max_volume = (total_up_max_volume * up_frac + total_down_max_volume * down_frac)
            
             # Assert the constraints
            activation_constraint = total_max_volume  * w[h,m].X <= Va_hm[h,m]
            assert activation_constraint, f"Activation constraint violated for hour {h}, market {m}"
            market_max_volume = market.volume_data.loc[market.volume_data["Time(Local)"] == hour].values[0][1]
            assert total_max_volume <= market_max_volume * y[h,m].X, f"Maximum volume constraint violated at hour {h} for market {m}"





In [None]:
test_solution_validity(x, y, L, M, H)

In [None]:
def get_market_count_dict(x):
    data = []

    for h, hour in enumerate(H):
        for l, load in enumerate(L):
            for m, market in enumerate(M):
                if x[h, l, m] > 0.5:
                    # Calculate flex volume for this asset, market, and hour
        
                    data.append([hour, load.meter_id, market.name])

    df = pd.DataFrame(data, columns=["Hour", "Asset Meter ID", "Market"])
    market_names = [m.name for m in M]
    market_count_dict = {}
    for h, hour in enumerate(H):
        hour_df = df.loc[(df["Hour"] == hour)]
        # Aggregate data by market and hour, counting assets and summing flex volumes
        market_count = hour_df.groupby(["Market", "Hour"]).agg({"Asset Meter ID": "count"}).reset_index().rename(columns={"Asset Meter ID": "Asset Count"})
        flex_volumes = []
        for market_name in market_count["Market"]:
            m = market_names.index(market_name)
            market = M[m]
            if market.direction == "up":
                total_flex_volume = sum(x[h, l, m] * load.up_flex_volume["value"].loc[load.up_flex_volume["Time(Local)"] == hour].values[0] for l, load in enumerate(L) if load.direction != "down")
            elif market.direction == "down":
                total_flex_volume = sum(x[h, l, m] * load.down_flex_volume["value"].loc[load.down_flex_volume["Time(Local)"] == hour].values[0] for l, load in enumerate(L) if load.direction != "up")
            else:
                if dominant_directions[h] == "up":
                    total_flex_volume = sum(x[h, l, m] * load.up_flex_volume["value"].loc[load.up_flex_volume["Time(Local)"] == hour].values[0] for l, load in enumerate(L) if load.direction != "down")
                else:
                    total_flex_volume = sum(x[h, l, m] * load.down_flex_volume["value"].loc[load.down_flex_volume["Time(Local)"] == hour].values[0] for l, load in enumerate(L) if load.direction != "up")
                
            flex_volumes.append(total_flex_volume)
        market_count["Total Flex Volume"] = flex_volumes
        market_count_dict[hour] = market_count
    return market_count_dict

In [None]:
# Extract binary variable values from the original model
new_x_values = {(h, l, m): test_model.getVarByName(f"x_{h}_{l}_{m}").X for h in range(len(H)) for l in range(len(L)) for m in range(len(M))}


In [None]:
# Load the saved values
with open('current_x_values.pkl', 'rb') as f:
    original_x_values = pickle.load(f)

old_dict = get_market_count_dict(original_x_values)
new_dict = get_market_count_dict(new_x_values)

differences = {}
for key in old_dict:
    if not old_dict[key].equals(new_dict[key]):
        differences[key] = (new_dict[key], old_dict[key])
        

for key, (orig_val, mod_val) in differences.items():
    print(f"Difference for hour {key}: \n Original={display(orig_val)}, \n  Modified={display(mod_val)}")


# Compare the values
"""differences = {}
for key in original_x_values:
    if original_x_values[key] != new_x_values[key]:
        differences[key] = (original_x_values[key], new_x_values[key])

# Print differences
for key, (orig_val, mod_val) in differences.items():
    print(f"Difference in variable x{key}: Original={orig_val}, Modified={mod_val}")"""

In [None]:
# Extract binary variable values from the original model
current_x_values = {(h, l, m): test_model.getVarByName(f"x_{h}_{l}_{m}").X for h in range(len(H)) for l in range(len(L)) for m in range(len(M))}

# Save these values
with open('current_x_values.pkl', 'wb') as f:
    pickle.dump(current_x_values, f)
                